Compare commits

...

129 Commits

Author SHA1 Message Date
James Hush
8375d299bc Revert 2025-11-28 13:53:12 +01:00
James Hush
98df964e68 fix: propagate skip_tts flag through LLM response frames
- Add skip_tts as an init parameter for TextFrame, LLMFullResponseStartFrame,
  and LLMFullResponseEndFrame instead of setting it post-init
- Update all LLM services to pass skip_tts when creating frames:
  - Anthropic, AWS (Bedrock, Nova Sonic, AgentCore), Google (Gemini, Gemini Live)
  - OpenAI (base, realtime), OpenAI Realtime Beta, SambaNova
- Add _get_skip_tts() helper method in LLMService base class
- Remove push_frame override that was setting skip_tts after frame creation
2025-11-28 13:40:09 +01:00
Aleix Conchillo Flaqué
b78eb5de6b Merge pull request #3148 from pipecat-ai/aleix/pipecat-0.0.96-update
update CHANGELOG for 0.0.96 with proper date
2025-11-26 17:21:31 -08:00
Aleix Conchillo Flaqué
95aa13beb1 update CHANGELOG for 0.0.96 with proper date 2025-11-26 17:16:54 -08:00
Mark Backman
88ce85342c Merge pull request #3147 from pipecat-ai/mb/fix-sagemaker-error-handling
Fix error handling in DeepramSageMakerSTTService
2025-11-26 20:15:45 -05:00
Mark Backman
bedd40ae8b Fix error handling in DeepramSageMakerSTTService 2025-11-26 20:12:31 -05:00
Mark Backman
fda327b3ee Merge pull request #3146 from pipecat-ai/mb/fix-aws-bedrock-region
fix: AWSBedrockLLMService was always set to us-east-1
2025-11-26 19:56:09 -05:00
Mark Backman
ace95b6e6d fix: AWSBedrockLLMService was always set to us-east-1 2025-11-26 19:52:04 -05:00
Aleix Conchillo Flaqué
26c5c28c5c Merge pull request #3145 from pipecat-ai/aleix/simli-enable-logging-param
SimliVideoService: add enable_logging input parameter
2025-11-26 16:49:12 -08:00
Aleix Conchillo Flaqué
81f862749d SimliVideoService: add enable_logging input parameter 2025-11-26 16:36:06 -08:00
Aleix Conchillo Flaqué
b8bf7b4132 Merge pull request #3143 from pipecat-ai/aleix/pipecat-0.0.96
update CHANGELOG for 0.0.96
2025-11-26 16:31:44 -08:00
Aleix Conchillo Flaqué
d90121ef3b update CHANGELOG for 0.0.96 2025-11-26 15:30:06 -08:00
Filipi da Silva Fuchter
d0b7b4fb0a Merge pull request #3144 from pipecat-ai/filipi/fix_flux_reconnection_issue
Fixed an issue with DeepgramFluxSTTService where it sometimes failed to reconnect.
2025-11-26 20:29:41 -03:00
Filipi Fuchter
4acc317923 Fixed an issue with DeepgramFluxSTTService where it sometimes failed to reconnect. 2025-11-26 20:23:03 -03:00
Filipi da Silva Fuchter
7caf5751ee Merge pull request #3084 from pipecat-ai/filipi/improve_error_handler
Improving error handler.
2025-11-26 18:40:44 -03:00
Filipi Fuchter
1330ef3ad6 Enhanced error handling across the framework.
Co-authored-by: Mark Backman <m.backman@gmail.com>
2025-11-26 18:34:25 -03:00
Mark Backman
9efb21d61e Merge pull request #3115 from pipecat-ai/mb/deepgram-websocket-tts
Update DeepgramTTSService to use Deepgram's Websocket TTS API
2025-11-26 13:30:52 -05:00
Mark Backman
6d93b8e9d8 Update DeepgramTTSService to use Deepgram's Websocket TTS API 2025-11-26 13:25:34 -05:00
Aleix Conchillo Flaqué
6f527e509e update CHANGELOG with FishAudioTTSService s1 model update 2025-11-26 10:22:59 -08:00
Aleix Conchillo Flaqué
6cf1d0417e Merge pull request #3136 from kcui5/patch-1
Update Fish Audio default model to s1
2025-11-26 10:19:26 -08:00
Mark Backman
19d8b0dfc2 Merge pull request #3011 from thsunkid/feat/add-cached-reasoning-tokens-metrics-to-opentel-spans 2025-11-26 07:45:33 -05:00
Kyle Cui
7fa0cbf2a9 Update Fish Audio default model to s1
Update default model from speech-1.5 to s1 for Fish Audio TTS service
2025-11-26 01:50:38 -08:00
Thu Nguyen
36c4bc2df2 Update changelog 2025-11-26 13:01:48 +07:00
Thu Nguyen
42be0183af Merge branch 'main' into feat/add-cached-reasoning-tokens-metrics-to-opentel-spans 2025-11-26 12:59:43 +07:00
Mark Backman
2607699664 Merge pull request #3125 from pipecat-ai/mb/fix-sagemaker-imports
fix: remove stt_sagemaker import from deepgram/__init__.py
2025-11-24 21:31:31 -05:00
Mark Backman
47fa3b8556 Merge pull request #3108 from fbarril/livekit-transport-helper
add livekit helper
2025-11-24 20:13:13 -05:00
Mark Backman
fa0100c38b fix: remove stt_sagemaker import from deepgram/__init__.py 2025-11-24 20:04:18 -05:00
kompfner
e5142c1210 Merge pull request #3113 from pipecat-ai/pk/agentcore-processor
Initial implementation of `AWSBedrockAgentCoreProcessor`
2025-11-24 19:10:44 -05:00
Paul Kompfner
5907b51c7d In AWSBedrockAgentCoreProcessor use self.create_task()/self.cancel_task() instead of using asyncio directly. 2025-11-24 18:53:39 -05:00
Paul Kompfner
9e4ec4f7f3 Implement AWSBedrockAgentCoreProcessor 2025-11-24 18:53:35 -05:00
fbarril
e2161ea63d add pyjwt as a livekit dependency 2025-11-24 23:30:11 +00:00
fbarril
7c81f66241 Merge remote-tracking branch 'origin/main' into livekit-transport-helper
# Conflicts:
#	CHANGELOG.md
#	uv.lock
2025-11-24 23:29:22 +00:00
fbarril
60da466379 add pyjwt as a livekit dependency 2025-11-24 23:27:32 +00:00
fbarril
12c29b71f3 add entry to CHANGELOG.md 2025-11-24 23:27:13 +00:00
Mark Backman
b52b108932 Merge pull request #3118 from pipecat-ai/mb/deepgram-stt-sagemaker
Add SageMaker BiDi client and DeepgramSageMakerSTTService
2025-11-24 16:47:25 -05:00
Mark Backman
a357ff0205 Alphabetize the project.optional-dependencies 2025-11-24 16:43:44 -05:00
Mark Backman
0ece8b5894 Add 07c Deepgram SageMaker example 2025-11-24 16:41:01 -05:00
Mark Backman
782b257bbb Add DeepgramSageMakerSTTService 2025-11-24 16:41:01 -05:00
Mark Backman
ab8dcd6ede Add SageMaker BiDi client 2025-11-24 16:41:00 -05:00
Mark Backman
012c2f7dde Merge pull request #3106 from pipecat-ai/mb/update-11labs-realtime-stt
Fix sample_rate issue in ElevenLabsRealtimeSTTService, add timestamps…
2025-11-24 08:10:30 -05:00
Mark Backman
87fdd8f006 Fix MiniMax changelog entries 2025-11-24 08:07:20 -05:00
Mark Backman
7bdac02837 Fix sample_rate issue in ElevenLabsRealtimeSTTService, add timestamps and logging 2025-11-24 08:06:33 -05:00
Mark Backman
861567bc59 Merge pull request #3119 from pipecat-ai/aleix/changelog-formatting
format CHANGELOG
2025-11-24 08:05:11 -05:00
Aleix Conchillo Flaqué
d0ff43134a format CHANGELOG 2025-11-23 17:48:57 -08:00
Dante Noguez
3458b74fc9 Fix 11labs realtime dynamic updates (#3117) 2025-11-22 10:02:37 -05:00
mattie ruth backman
a6202c4d1a Fixed CHANGELOG post rebase 2025-11-21 17:16:10 -05:00
mattie ruth backman
3c3141796a Overlooked Changelog updates 2025-11-21 17:16:10 -05:00
mattie ruth backman
8b8b57b09c Introduced new bot-output RTVI event to provide...
a best effort version of the bot's output

- The `RTVIObserver` now emits `bot-output` messages based off
  the new `AggregatedTextFrame`s (`bot-tts-text` and
  `bot-llm-text` are still supported and generated, but
  `bot-transcript` is now deprecated in lieu of this new, more
  thorough, message).
- The new `RTVIBotOutputMessage` includes the fields:
  - `spoken`: A boolean indicating whether the text was spoken by TTS
  - `aggregated_by`: A string representing how the text was aggregated
    ("sentence", "word", "my custom aggregation")
- Introduced new fields to `RTVIObserver` to support the new
  `bot-output` messaging:
  - `bot_output_enabled`: Defaults to True. Set to false to disable
    bot-output messages.
  - `skip_aggregator_types`: Defaults to `None`. Set to a list of
    strings that match aggregation types that should not be included
    in bot-output messages. (Ex. `credit_card`)
2025-11-21 17:16:10 -05:00
mattie ruth backman
4f30a48ecd Rime and Cartesia TTS Updates:
`CartesiaTTSService`:
 - Modified use of custom default text_aggregator to avoid deprecation warnings and push users
   towards use of transformers or the `LLMTextProcessor`
 - Added convenience methods for taking advantage of Cartesia's SSML tags: spell, emotion,
   pauses, volume, and speed.

`RimeTTSService`:
 - Modified use of custom default text_aggregator to avoid deprecation warnings and push users
   towards use of transformers or the `LLMTextProcessor`
 - Added convenience methods for taking advantage of Rime's customization options: spell,
   pauses, pronunciations, and inline speed control.
2025-11-21 17:16:10 -05:00
mattie ruth backman
ecbc41045c Added ability to transform text just-in-time before it gets sent to the TTS 2025-11-21 17:16:10 -05:00
mattie ruth backman
e1528d0f0c Added support to TTS services to skip sending text to the...
the actual TTS service to be spoken based on its aggregation type.
2025-11-21 17:16:10 -05:00
mattie ruth backman
6b6d760cf1 Introduced LLMTextProcessor and deprecatd custom text_aggregators in TTS
Introduced `LLMTextProcessor`: A new processor meant to allow customization for how
LLMTextFrames should be aggregated and considered. It's purpose is to turn
`LLMTextFrame`s into `AggregatedTextFrame`s. By default, a TTSService will still
aggregate `LLMTextFrame`s by sentence for the service to consume. However, if you
wish to override how the llm text is aggregated, you should no longer override the
TTS's internal text_aggregator, but instead, insert this processor between your LLM
and TTS in the pipeline.
2025-11-21 17:16:10 -05:00
mattie ruth backman
7a4372a909 Introduced a new AggregatedTextFrame Frame type that TTSTextFrame inherits from
This frame introduces an `aggregated_by` field to describe the type of text included
in the frame and allows unspoken groupings of text to be pushed through the pipeline
and treated similar to TTSTextFrames.
2025-11-21 17:16:10 -05:00
mattie ruth backman
0e820a01b9 Introduce append_to_context to TextFrames
Adding support for setting whether or not the text in the TextFrame
should be added to the LLM context (by the LLM assistant aggregator).
Defaults to `True`.
2025-11-21 17:16:10 -05:00
mattie ruth backman
24266c238f Augmented PatternPairAggregator so that matched patterns can...
be treated as their own aggregation, taking advantage of the new
ability to assign a type to an aggregation
2025-11-21 17:16:10 -05:00
mattie ruth backman
dcc20f86e1 Updated the BaseTextAggregator to categorize aggregations
Modified the BaseTextAggregator type so that when text gets aggregated, metadata can
be associated with it. Currently, that just means a `type`, so that the aggregation
can be classified or described. Changes made to support this:
  - **IMPORTANT**: Aggregators are now expected to strip leading/trailing white space
    characters before returning their aggregation from `aggregation()` or `.text`. This
    way all aggregators have a consistent contract allowing downstream use to know how
    to stitch aggregations back together
  - Introduced a new `Aggregation` dataclass to represent both the aggregated `text` and
    a string identifying the `type` of aggregation (ex. "sentence", "word", "my custom
    aggregation")
  - **BREAKING**: `BaseTextAggregator.text` now returns an `Aggregation` (instead of `str`).
    To update: `aggregated_text = myAggregator.text` -> `aggregated_text = myAggregator.text.text`
  - **BREAKING**: `BaseTextAggregator.aggregate()` now returns `Optional[Aggregation]`
    (instead of `Optional[str]`). To update:
      ```
      aggregation = myAggregator.aggregate(text)
      if (aggregation):
        print(f"successfully aggregated text: {aggregation.text}") // instead of {aggregation}
      ```
  - `SimpleTextAggregator`, `SkipTagsAggregator`, `PatternPairAggregator` updated to
     produce/consume `Aggregation` objects.
  - All uses of the above Aggregators have been updated accordingly.
2025-11-21 17:16:10 -05:00
fbarril
ec8964425a add livekit helper 2025-11-21 00:27:57 +00:00
Vanessa Pyne
26918728df Merge pull request #3096 from pipecat-ai/vp-minimax-2962-v2
minimax 2962 language updates
2025-11-20 10:41:35 -06:00
vipyne
954849379b cleanup 2025-11-20 10:41:09 -06:00
vipyne
06542a2dbc Update CHANGELOG 2025-11-20 10:41:09 -06:00
Vanessa Pyne
59d40eac45 Update src/pipecat/services/minimax/tts.py
Co-authored-by: Mark Backman <mark@daily.co>

add warning
2025-11-20 10:41:09 -06:00
vipyne
17cf6c56cf minimax updates
some `debug`s -> `trace`s

add western US base_url to docs

ensure error_message is defined

add deprecation warning for `english_normalization` param
2025-11-20 10:41:09 -06:00
minimax
616e6ba351 docs(minimax): add API endpoint comment for west US region 2025-11-20 10:41:08 -06:00
minimax
f3cb5e0106 feat(minimax): comprehensive updates to TTS service
- Add support for speech-2.6-hd and speech-2.6-turbo models
- Add 16 new languages (total 40): Afrikaans, Bulgarian, Catalan, Danish, Persian, Filipino, Hebrew, Croatian, Hungarian, Malay, Norwegian, Nynorsk, Slovak, Slovenian, Swedish, Tamil
- Add new emotions: calm and fluent
- Add new parameters: text_normalization (renamed from english_normalization), latex_read, force_cbr, exclude_aggregated_audio, subtitle_enable, subtitle_type
- Extract trace_id from response headers for all requests
- Improve error handling for non-streaming error responses
- Add detailed extra_info logging (audio_length, audio_size, usage_characters, word_count)
- Add validation warnings for language/model compatibility
- Fix silent error issue where HTTP 200 responses with errors were ignored

BREAKING CHANGE: Renamed parameter english_normalization to text_normalization
2025-11-20 10:41:08 -06:00
Aleix Conchillo Flaqué
c89f230c99 fix CHANGELOG 2025-11-20 08:40:30 -08:00
Aleix Conchillo Flaqué
69cd5716cd Merge pull request #3102 from pipecat-ai/aleix/daily-python-0.22.0
pyproject: update daily-python to 0.22.0
2025-11-20 08:35:39 -08:00
Mark Backman
ab58f72322 Merge pull request #3101 from hwuiwon/hw/inworld-talking-speed
feat: Add speaking rate control to Inworld TTS service.
2025-11-20 09:50:55 -05:00
Hwuiwon Kim
ead361f665 fix 2025-11-20 07:45:13 -05:00
Aleix Conchillo Flaqué
fa6b8851ed pyproject: update daily-python to 0.22.0 2025-11-19 21:56:38 -08:00
Hwuiwon Kim
1cc69d475d feat: Add speaking rate control to Inworld TTS service & fix param cases 2025-11-19 22:57:53 -05:00
Mark Backman
51bdd8b728 Merge pull request #3097 from hwuiwon/fix-typo
Fix typo in STT event handler documentation
2025-11-19 17:10:32 -05:00
Hwuiwon Kim
30ff488714 Fix typo in event handler documentation 2025-11-19 17:04:07 -05:00
Vanessa Pyne
510f3df6b7 Merge pull request #3091 from pipecat-ai/vp-fix-mcp-examples
update MCP foundational examples
2025-11-19 10:35:08 -06:00
vipyne
68292bd75f rename MCP foundational examples 2025-11-19 10:34:13 -06:00
vipyne
42423bff41 update MCP foundational examples 2025-11-19 10:29:18 -06:00
Aleix Conchillo Flaqué
c3d2a25229 Merge pull request #3082 from pipecat-ai/aleix/pipecat-0.0.95
update CHANGELOG for 0.0.95
2025-11-18 21:17:07 -08:00
Aleix Conchillo Flaqué
cf1a9c1548 update CHANGELOG for 0.0.95 2025-11-18 21:14:27 -08:00
Aleix Conchillo Flaqué
51ba245e10 scripts(evals): fix EVAL_CONVERSATION/EVAL_WEATHER eval 2025-11-18 21:14:27 -08:00
Aleix Conchillo Flaqué
39b4e61837 SimliVideoService: fix connection issue 2025-11-18 19:41:47 -08:00
Aleix Conchillo Flaqué
ceaf53fdb0 LLMContext: async create_image_message/create_audio_message fixes 2025-11-18 19:41:13 -08:00
Aleix Conchillo Flaqué
f93276c64f Merge pull request #3090 from pipecat-ai/revert_function_calling_pr
Reverting: Ensure that the function call results respect the previous LLM context
2025-11-18 19:40:58 -08:00
Mark Backman
62a0f0c0f5 Merge pull request #3070 from ivaaan/hume-timestamps 2025-11-18 19:56:20 -05:00
Filipi Fuchter
793aca6b8b Revert "Ensure that the function call results respect the previous LLM context."
This reverts commit a510b276e6.
2025-11-18 21:38:49 -03:00
Filipi Fuchter
1fcaf3a4bf Revert "Searching in both _function_calls_context_messages and context messages when updating the result."
This reverts commit fccc91e923.
2025-11-18 21:38:49 -03:00
ivaaan
6484855139 fix changelog 2025-11-18 21:47:46 +01:00
ivaaan
771469b834 fix changelog 2025-11-18 21:39:29 +01:00
kompfner
a60618b0ca Merge pull request #3080 from pipecat-ai/pk/assistant-aggregator-handles-mixed-includes-inter-frame-spaces-text
`LLMAssistantAggregator` now properly aggregates text that might be a…
2025-11-18 15:24:27 -05:00
Paul Kompfner
3d21faaac2 LLMAssistantAggregator now properly aggregates text that might be a mix of includes_inter_frame_spaces=True and includes_inter_frame_spaces=False frames 2025-11-18 15:12:25 -05:00
ivaaan
f325eeb95b rm TranscriptProcessor 2 2025-11-18 20:41:10 +01:00
ivaaan
4c3fd42b1c fix changelog 2025-11-18 20:36:45 +01:00
ivaaan
c2309efd7e rm TranscriptProcessor 2025-11-18 20:35:09 +01:00
Ivan A
4ae1819645 Update src/pipecat/services/hume/tts.py
Co-authored-by: Mark Backman <m.backman@gmail.com>
2025-11-18 20:30:44 +01:00
Ivan A
a38f208135 Update examples/foundational/07ae-interruptible-hume.py
Co-authored-by: Mark Backman <m.backman@gmail.com>
2025-11-18 20:30:28 +01:00
Mark Backman
d1eb837890 Merge pull request #3081 from pipecat-ai/mb/fix-30-tts-text-frame-log
Fix foundational 30 example to output TTSTextFrames synced to audio
2025-11-18 14:10:56 -05:00
Mark Backman
153201542b Fix foundational 30 example to output TTSTextFrames synced to audio 2025-11-18 13:29:06 -05:00
Filipi da Silva Fuchter
9137e50043 Merge pull request #3053 from pipecat-ai/filipi/function_calls
Ensure that the function call results respect the previous LLM context.
2025-11-18 14:59:01 -03:00
Ivan A
8dbe119a73 Merge branch 'main' into hume-timestamps 2025-11-18 18:38:24 +01:00
ivaaan
26f96d0be8 upd example 2025-11-18 18:31:38 +01:00
ivaaan
9944e6faf0 upd service based on Mark's suggestions 2025-11-18 18:25:53 +01:00
Aleix Conchillo Flaqué
c1573c1f76 Merge pull request #3078 from pipecat-ai/aleix/llm-context-create-image-audio-async
LLMContext: create_image_message/create_audio_message are now async
2025-11-18 09:06:51 -08:00
Aleix Conchillo Flaqué
9f45ad4d2e LLMContext: create_image_message/create_audio_message are now async 2025-11-18 09:04:40 -08:00
Filipi Fuchter
fccc91e923 Searching in both _function_calls_context_messages and context messages when updating the result. 2025-11-18 11:50:28 -03:00
Filipi Fuchter
a510b276e6 Ensure that the function call results respect the previous LLM context. 2025-11-18 11:37:57 -03:00
Mark Backman
6481094638 Merge pull request #3058 from pipecat-ai/mb/add-camera-screen-support-smallwebrtc
Add camera and screen capture support to dev runner for SmallWebRTC
2025-11-18 09:22:36 -05:00
Mark Backman
3132e12265 Add camera and screen capture support to dev runner for SmallWebRTC 2025-11-18 09:19:13 -05:00
Aleix Conchillo Flaqué
12af3f79d0 Merge pull request #3060 from pipecat-ai/aleix/consumer-queue-frames
ConsumerProcessor: queue frames internally instead of pushing them
2025-11-18 00:54:18 -08:00
Aleix Conchillo Flaqué
4835617b16 ConsumerProcessor: queue frames internally instead of pushing them 2025-11-17 23:52:09 -08:00
Aleix Conchillo Flaqué
9283108240 Merge pull request #3073 from pipecat-ai/aleix/base-text-filter-only-filter
BaseTextFilter: only require subclasses to implement filter()
2025-11-17 23:29:26 -08:00
kompfner
515eaeeb1a Merge pull request #3074 from pipecat-ai/pk/tweak-moondream-example
Update Moondream example so that Moondream service output makes it in…
2025-11-17 16:52:18 -05:00
Paul Kompfner
5095fc6a64 Update Moondream example so that Moondream service output makes it into the context, even if the TTS service is disabled 2025-11-17 15:16:19 -05:00
Aleix Conchillo Flaqué
7eedb33d50 BaseTextFilter: only require subclasses to implement filter() 2025-11-17 11:23:47 -08:00
Filipi da Silva Fuchter
47f78df497 Merge pull request #3071 from pipecat-ai/filipi/small_webrtc_custom_data
Passing the custom request_data to the SmallWebRTCRunnerArguments body.
2025-11-17 15:50:11 -03:00
Filipi Fuchter
74154b26a2 Mentioning the SmallWebRTCTransport fix in the readme. 2025-11-17 15:39:07 -03:00
Filipi Fuchter
0c3c26b7b8 Passing the custom request_data to the SmallWebRTCRunnerArguments body. 2025-11-17 15:20:09 -03:00
kompfner
64417ef4ff Merge pull request #3061 from pipecat-ai/pk/greatly-simplify-inter-frame-spaces-logic
D'oh! My TTS "inter-frame-spaces" logic was *way* overcomplicated (an…
2025-11-17 10:47:56 -05:00
Paul Kompfner
f3b254e335 D'oh! My TTS "inter-frame-spaces" logic was *way* overcomplicated (and fundamentally mistaken, though it happened to work)
Now:
- For TTS word-by-word output and `TTSSpeakFrames`: `TTSTextFrame`s' have `includes_inter_frame_spaces=False`.
- For all other TTS output: `TTSTextFrame` pass through the received text frames' `includes_inter_frame_spaces` value. So far, this value has always been `True`: LLMs send text chunks already containing all necessary spaces.
- `LLMTextFrame`s set `includes_inter_frame_spaces=False` at init time, per the aforementioned assumption.
2025-11-17 10:14:28 -05:00
Filipi da Silva Fuchter
f27119a712 Merge pull request #3069 from pipecat-ai/filipi/fix_riva
Fixing RivaTTSService error handler.
2025-11-17 11:48:15 -03:00
ivaaan
2a51d0f1e5 add changelog 2025-11-17 15:20:06 +01:00
ivaaan
9156e21727 fix formatting 2025-11-17 14:00:03 +01:00
Filipi da Silva Fuchter
a5145be16e Merge pull request #3038 from pipecat-ai/filipi/flux_improvements
Deepgram Flux improvements
2025-11-17 09:57:43 -03:00
Filipi Fuchter
b104a59b10 Mentioning the Deepgram Flux improvements in the changelog. 2025-11-17 09:54:39 -03:00
Filipi Fuchter
04dbbabc03 Introduced a minimum confidence parameter in DeepgramFluxSTTService to avoid generating transcriptions below a defined threshold. 2025-11-17 09:54:30 -03:00
Filipi Fuchter
19cc0177b8 Refactored DeepgramFluxSTTService to automatically reconnect if sending a message fails. 2025-11-17 09:54:20 -03:00
Filipi Fuchter
77cd106795 Extracted the logic for retrying connections, and create a new send_with_retry method inside WebSocketService. 2025-11-17 09:54:08 -03:00
ivaaan
71869a116d fix errors 2025-11-17 13:51:04 +01:00
ivaaan
2f2bde9856 add timestamps to example 2025-11-17 13:40:03 +01:00
ivaaan
7de8838deb add word-level timestamp support to Hume service 2025-11-17 13:25:12 +01:00
Filipi Fuchter
9bf88bbf14 Fixing RivaTTSService error handler. 2025-11-17 07:43:30 -03:00
Thu Nguyen
35593b8574 Add cached and reasoning token metrics to OpenTelemetry spans 2025-11-09 00:38:30 +07:00
129 changed files with 3811 additions and 1263 deletions

View File

@@ -5,22 +5,340 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.96] - 2025-11-26 🦃 "Happy Thanksgiving!" 🦃
### Added
- Enhanced error handling across the framework:
- Added `on_error` callback to `FrameProcessor` for centralized error
handling.
- Renamed `push_error(error: ErrorFrame)` to `push_error_frame(error: ErrorFrame)`
for clarity.
- Added new `push_error` method for simplified error reporting:
```python
async def push_error(error_msg: str,
exception: Optional[Exception] = None,
fatal: bool = False)
```
- Standardized error logging by replacing `logger.exception` calls with
`logger.error` throughout the codebase.
- Added `cache_read_input_tokens`, `cache_creation_input_tokens` and
`reasoning_tokens` to OTel spans for LLM call
- Added `LiveKitRESTHelper` utility class for managing LiveKit rooms via REST API.
- Added `DeepgramSageMakerSTTService` which connects to a SageMaker hosted
Deepgram STT model. Added `07c-interruptible-deepgram-sagemaker.py`
foundational example.
- Added `SageMakerBidiClient` to connect to SageMaker hosted BiDi compatible
services.
- Added support for `include_timestamps` and `enable_logging` in
`ElevenLabsRealtimeSTTService`. When `include_timestamps` is enabled,
timestamp data is included in the `TranscriptionFrame`'s `result`
parameter.
- Added optional speaking rate control to `InworldTTSService`.
- Introduced a new `AggregatedTextFrame` type to support passing text along with
an `aggregated_by` field to describe the type of text
included. `TTSTextFrame`s now inherit from `AggregatedTextFrame`. With this
inheritance, an observer can watch for `AggregatedTextFrame`s to accumlate the
perceived output and determine whether or not the text was spoken based on if
that frame is also a `TTSTextFrame`.
With this frame, the llm token stream can be transformed into custom
composable chunks, allowing for aggregation outside the TTS service. This
makes it possible to listen for or handle those aggregations and sets the
stage for doing things like composing a best effort of the perceived llm
output in a more digestable form and to do so whether or not it is processed
by a TTS or if even a TTS exists.
- Introduced `LLMTextProcessor`: A new processor meant to allow customization
for how LLMTextFrames should be aggregated and considered. It's purpose is to
turn `LLMTextFrame`s into `AggregatedTextFrame`s. By default, a TTSService
will still aggregate `LLMTextFrame`s by sentence for the service to
consume. However, if you wish to override how the llm text is aggregated, you
should no longer override the TTS's internal text_aggregator, but instead,
insert this processor between your LLM and TTS in the pipeline.
- New `bot-output` RTVI message to represent what the bot actually "says".
- The `RTVIObserver` now emits `bot-output` messages based off the new
`AggregatedTextFrame`s (`bot-tts-text` and `bot-llm-text` are still
supported and generated, but `bot-transcript` is now deprecated in lieu of
this new, more thorough, message).
- The new `RTVIBotOutputMessage` includes the fields:
- `spoken`: A boolean indicating whether the text was spoken by TTS
- `aggregated_by`: A string representing how the text was aggregated
("sentence", "word", "my custom aggregation")
- Introduced new fields to `RTVIObserver` to support the new `bot-output`
messaging:
- `bot_output_enabled`: Defaults to True. Set to false to disable bot-output
messages.
- `skip_aggregator_types`: Defaults to `None`. Set to a list of strings that
match aggregation types that should not be included in bot-output
messages. (Ex. `credit_card`)
- Introduced new methods, `add_text_transformer()` and
`remove_text_transformer()`, to `RTVIObserver` to support providing (and
subsequently removing) callbacks for various types of aggregations (or all
aggregations with `*`) that can modify the text before being sent as a
`bot-output` or `tts-text` message. (Think obscuring the credit card or
inserting extra detail the client might want that the context doesn't need.)
- In `MiniMaxHttpTTSService`:
- Added support for speech-2.6-hd and speech-2.6-turbo models
- Added languages: Afrikaans, Bulgarian, Catalan, Danish, Persian, Filipino,
Hebrew, Croatian, Hungarian, Malay, Norwegian, Nynorsk, Slovak, Slovenian,
Swedish, and Tamil
- Added new emotions: calm and fluent
- Added `enable_logging` to `SimliVideoService` input parameters. It's disabled
by default.
### Changed
- Updated `FishAudioTTSService` default model to `s1`.
- Updated `DeepgramTTSService` to use Deepgram's TTS websocket API. ⚠️ This is
a potential breaking change, which only affects you if you're self-hosting
`DeepgramTTSService`. The new service uses Websockets and improves TTFB
latency.
- Updated `daily-python` to 0.22.0.
- `BaseTextAggregator` changes:
Modified the BaseTextAggregator type so that when text gets aggregated,
metadata can be associated with it. Currently, that just means a `type`, so
that the aggregation can be classified or described. Changes made to support
this:
- ⚠️ IMPORTANT: Aggregators are now expected to strip leading/trailing white
space characters before returning their aggregation from `aggregation()` or
`.text`. This way all aggregators have a consistent contract allowing
downstream use to know how to stitch aggregations back together.
- Introduced a new `Aggregation` dataclass to represent both the aggregated
`text` and a string identifying the `type` of aggregation (ex. "sentence",
"word", "my custom aggregation")
- ⚠️ Breaking change: `BaseTextAggregator.text` now returns an `Aggregation`
(instead of `str`).
Before:
```python
aggregated_text = myAggregator.text
```
Now:
```python
aggregated_text = myAggregator.text.text
```
- ⚠️ Breaking change: `BaseTextAggregator.aggregate()` now returns
`Optional[Aggregation]` (instead of `Optional[str]`).
Before:
```python
aggregation = myAggregator.aggregate(text)
print(f"successfully aggregated text: {aggregation}")
```
Now:
```python
aggregation = myAggregator.aggregate(text)
if aggregation:
print(f"successfully aggregated text: {aggregation.text}")
```
- `SimpleTextAggregator`, `SkipTagsAggregator`, `PatternPairAggregator`
updated to produce/consume `Aggregation` objects.
- All uses of the above Aggregators have been updated accordingly.
- Augmented the `PatternPairAggregator` so that matched patterns can be treated
as their own aggregation, taking advantage of the new. To that end:
- Introduced a new, preferred version of `add_pattern` to support a new option
for treating a match as a separate aggregation returned from
`aggregate()`. This replaces the now deprecated `add_pattern_pair` method
and you provide a `MatchAction` in lieu of the `remove_match` field.
- `MatchAction` enum: `REMOVE`, `KEEP`, `AGGREGATE`, allowing customization
for how a match should be handled.
- `REMOVE`: The text along with its delimiters will be removed from the
streaming text. Sentence aggregation will continue on as if this text
did not exist.
- `KEEP`: The delimiters will be removed, but the content between them
will be kept. Sentence aggregation will continue on with the internal
text included.
- `AGGREGATE`: The delimiters will be removed and the content between will
be treated as a separate aggregation. Any text before the start of the
pattern will be returned early, whether or not a complete sentence was
found. Then the pattern will be returned. Then the aggregation will
continue on sentence matching after the closing delimiter is found. The
content between the delimiters is not aggregated by sentence. It is
aggregated as one single block of text.
- `PatternMatch` now extends `Aggregation` and provides richer info to
handlers.
- ⚠️ Breaking change: The `PatternMatch` type returned to handlers registered
via `on_pattern_match` has been updated to subclass from the new
`Aggregation` type, which means that `content` has been replaced with
`text` and `pattern_id` has been replaced with `type`:
```python
async dev on_match_tag(match: PatternMatch):
pattern = match.type # instead of match.pattern_id
text = match.text # instead of match.content
```
- `TextFrame` now includes the field `append_to_context` to support setting
whether or not the encompassing text should be added to the LLM context (by
the LLM assistant aggregator). It defaults to `True`.
- `TTSService` base class updates:
- `TTSService`s now accept a new `skip_aggregator_types` to avoid speaking
certain aggregation types (now determined/returned by the aggregator)
- Introduced the ability to do a just-in-time transform of text before it gets
sent to the TTS service via callbacks you can set up via a new init field,
`text_transforms` or a new method `add_text_transformer()`. This makes it
possible to do things like introduce TTS-specific tags for spelling or
emotion or change the pronunciation of something on the
fly. `remove_text_transformer` has also been added to support removing a
registered transform callback.
- TTS services push `AggregatedTextFrame` in addition to `TTSTextFrame`s when
either an aggregation occurs that should not be spoken or when the TTS
service supports word-by-word timestamping. In the latter case, the
`TTSService` preliminarily generates an `AggregatedTextFrame`, aggregated by
sentence to generate the full sentence content as early as possible.
- Updated `CartesiaTTSService`:
- Modified use of custom default text_aggregator to avoid deprecation warnings
and push users towards use of transformers or the `LLMTextProcessor`
- Added convenience methods for taking advantage of Cartesia's SSML tags:
spell, emotion, pauses, volume, and speed.
- Updated `RimeTTSService`:
- Modified use of custom default text_aggregator to avoid deprecation warnings
and push users towards use of transformers or the `LLMTextProcessor`
- Added convenience methods for taking advantage of Rime's customization
options: spell, pauses, pronunciations, and inline speed control.
### Deprecated
- The TTS constructor field, `text_aggregator` is deprecated in favor of the new
`LLMTextProcessor`. TTSServices still have an internal aggregator for support
of default behavior, but if you want to override the aggregation behavior, you
should use the new processor.
- The RTVI `bot-transcription` event is deprecated in favor of the new
`bot-output` message which is the canonical representation of bot output
(spoken or not). The code still emits a transcription message for backwards
compatibility while transition occurs.
- Deprecated `add_pattern_pair` in the `PatternPairAggregator` which takes a
`pattern_id` and `remove_match` field in favor of the new `add_pattern` method
which takes a `type` and an `action`
- `english_normalization` input parameter for `MiniMaxHttpTTSService` is
deprecated, use `test_normalization` instead.
### Fixed
- Fixed an issue in `AWSBedrockLLMService` where the `aws_region` arg was
always set to `us-east-1`.
- Fixed an issue with `DeepgramFluxSTTService` where it sometimes failed to reconnect.
- Fixed an issue in `ElevenLabsRealtimeSTTService` where dynamic language
updates were not working.
- Fixed an issue in `ElevenLabsRealtimeSTTService` where setting the sample
rate would result in transcripts failing.
- Fixed `InworldTTSService` audio config payload to use camelCase keys expected
by the Inworld API.
## [0.0.95] - 2025-11-18
### Added
- Added ai-coustics integrated VAD (`AICVADAnalyzer`) with `AICFilter` factory and
example wiring; leverages the enhancement model for robust detection with no
ONNX dependency or added processing complexity.
- Added a watchdog to `DeepgramFluxSTTService` to prevent dangling tasks in case the
user was speaking and we stop receiving audio.
- Introduced a minimum confidence parameter in `DeepgramFluxSTTService` to avoid
generating transcriptions below a defined threshold.
- Added `ElevenLabsRealtimeSTTService` which implements the Realtime STT
service from ElevenLabs.
- Added a `TTSService.includes_inter_frame_spaces` property getter, so that TTS
services that subclass `TTSService` can indicate whether the text in the
`TTSTextFrame`s they push already contain any necessary inter-frame spaces.
- Added word-level timestamps support to Hume TTS service
### Changed
- ⚠️ Breaking change: `LLMContext.create_image_message()`,
`LLMContext.create_audio_message()`, `LLMContext.add_image_frame_message()`
and `LLMContext.add_audio_frames_message()` are now async methods. This fixes
an issue where the asyncio event loop would be blocked while encoding audio or
images.
- `ConsumerProcessor` now queues frames from the producer internally instead of
pushing them directly. This allows us to subclass consumer processors and
manipulate frames before they are pushed.
- `BaseTextFilter` only require subclasses to implement the `filter()` method.
- Extracted the logic for retrying connections, and create a new `send_with_retry`
method inside `WebSocketService`.
- Refactored `DeepgramFluxSTTService` to automatically reconnect if sending a
message fails.
- Updated all STT and TTS services to use consistent error handling pattern with
`push_error()` method for better pipeline error event integration.
- Added support for `maybe_capture_participant_camera()` and
`maybe_capture_participant_screen()` for `SmallWebRTCTransport` in the runner
utils.
- Added Hindi support for Rime TTS services.
- Updated `GeminiTTSService` to use Google Cloud Text-to-Speech streaming API
@@ -40,6 +358,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed a `SimliVideoService` connection issue.
- Fixed an issue in the `Runner` where, when using `SmallWebRTCTransport`, the
`request_data` was not being passed to the `SmallWebRTCRunnerArguments` body.
- Fixed subtle issue of assistant context messages ending up with double spaces
between words or sentences.
@@ -54,12 +377,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Prevented `HeyGenVideoService` from automatically disconnecting after 5 minutes.
### Added
- Added ai-coustics integrated VAD (`AICVADAnalyzer`) with `AICFilter` factory and
example wiring; leverages the enhancement model for robust detection with no
ONNX dependency or added processing complexity.
## [0.0.94] - 2025-11-10
### Changed

View File

@@ -44,6 +44,7 @@ DAILY_SAMPLE_ROOM_URL=https://...
# Deepgram
DEEPGRAM_API_KEY=...
SAGEMAKER_ENDPOINT_NAME=...
# DeepSeek
DEEPSEEK_API_KEY=...

View File

@@ -13,24 +13,29 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import LLMRunFrame, TTSTextFrame
from pipecat.observers.loggers.debug_log_observer import DebugLogObserver, FrameEndpoint
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
)
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.hume.tts import HUME_SAMPLE_RATE, HumeTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -88,7 +93,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
tts, # TTS (HumeTTSService with word timestamps)
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
@@ -102,7 +107,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
audio_out_sample_rate=HUME_SAMPLE_RATE,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[RTVIObserver(rtvi)],
observers=[
RTVIObserver(rtvi),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
}
),
],
)
@rtvi.event_handler("on_client_ready")
@@ -112,6 +124,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
logger.info(
"💡 Word timestamps are enabled! Watch the console for TTSTextFrame logs showing each word with its PTS."
)
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])

View File

@@ -52,7 +52,10 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramFluxSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramFluxSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
params=DeepgramFluxSTTService.InputParams(min_confidence=0.3),
)
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-2-andromeda-en")

View File

@@ -9,7 +9,6 @@ import os
from dotenv import load_dotenv
from loguru import logger
from mcp.client.session_group import SseServerParameters
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
@@ -23,16 +22,16 @@ from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.mcp_service import MCPClient
from pipecat.services.aws.llm import AWSBedrockLLMService
from pipecat.services.deepgram.stt_sagemaker import DeepgramSageMakerSTTService
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -61,56 +60,42 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
# Initialize Deepgram SageMaker STT Service
# This requires:
# - AWS credentials configured (via environment variables or AWS CLI)
# - A deployed SageMaker endpoint with Deepgram model
stt = DeepgramSageMakerSTTService(
endpoint_name=os.getenv("SAGEMAKER_ENDPOINT_NAME"),
region=os.getenv("AWS_REGION"),
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest"
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-2-andromeda-en")
llm = AWSBedrockLLMService(
aws_region=os.getenv("AWS_REGION"),
model="us.amazon.nova-pro-v1:0",
params=AWSBedrockLLMService.InputParams(temperature=0.8),
)
try:
# https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/
mcp = MCPClient(server_params=SseServerParameters(url=os.getenv("MCP_RUN_SSE_URL")))
except Exception as e:
logger.error(f"error setting up mcp")
logger.exception("error trace:")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
},
]
tools = {}
try:
tools = await mcp.register_tools(llm)
except Exception as e:
logger.error(f"error registering tools")
logger.exception("error trace:")
system = f"""
You are a helpful LLM in a WebRTC call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to a number of tools provided by mcp.run. Use any and all tools to help users.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
When asked for today's date, use 'https://www.datetoday.net/'.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
messages = [{"role": "system", "content": system}]
context = LLMContext(messages, tools)
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User spoken responses
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
context_aggregator.assistant(), # Assistant spoken responses
]
)
@@ -125,8 +110,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
@@ -146,14 +132,6 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("MCP_RUN_SSE_URL"):
logger.error(
f"Please set MCP_RUN_SSE_URL environment variable for this example. See https://mcp.run"
)
import sys
sys.exit(1)
from pipecat.runner.run import main
main()

View File

@@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
message = await LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,

View File

@@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
message = await LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,

View File

@@ -117,7 +117,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
message = await LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,

View File

@@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
message = await LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,

View File

@@ -15,14 +15,21 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMRunFrame,
TextFrame,
UserImageRequestFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -66,6 +73,27 @@ async def fetch_user_image(params: FunctionCallParams):
# await params.result_callback({"result": "Image is being captured."})
class MoondreamTextFrameWrapper(FrameProcessor):
"""Wraps Moondream-provided TextFrames with LLM response start/end frames.
This processor detects TextFrames and automatically wraps them with
LLMFullResponseStartFrame and LLMFullResponseEndFrame to provide proper
response boundaries for downstream processors.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# If we receive a TextFrame, wrap it with response start/end frames
if isinstance(frame, TextFrame):
await self.push_frame(LLMFullResponseStartFrame(), direction)
await self.push_frame(frame, direction)
await self.push_frame(LLMFullResponseEndFrame(), direction)
else:
# For all other frames, just pass them through
await self.push_frame(frame, direction)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -130,6 +158,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# If you run into weird description, try with use_cpu=True
moondream = MoondreamService()
# Wrap TextFrames with LLM response start/end frames, which makes Moondream
# output be treated like LLM responses for the purpose of context
# aggregation. Without this, the assistant context aggregator would ignore
# Moondream output (if the TTS service is disabled).
moondream_text_wrapper = MoondreamTextFrameWrapper()
pipeline = Pipeline(
[
transport.input(), # Transport user input
@@ -137,7 +171,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context_aggregator.user(), # User responses
ParallelPipeline(
[llm], # LLM
[moondream],
[moondream, moondream_text_wrapper],
),
tts, # TTS
transport.output(), # Transport bot output

View File

@@ -391,7 +391,7 @@ class AudioAccumulator(FrameProcessor):
)
self._user_speaking = False
context = LLMContext()
context.add_audio_frames_message(audio_frames=self._audio_frames)
await context.add_audio_frames_message(audio_frames=self._audio_frames)
await self.push_frame(LLMContextFrame(context=context))
elif isinstance(frame, InputAudioRawFrame):
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest

View File

@@ -150,7 +150,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
LLMLogObserver(),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.DESTINATION),
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
UserStartedSpeakingFrame: (BaseInputTransport, FrameEndpoint.SOURCE),
EndFrame: None,
}

View File

@@ -62,7 +62,11 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.utils.text.pattern_pair_aggregator import PatternMatch, PatternPairAggregator
from pipecat.utils.text.pattern_pair_aggregator import (
MatchAction,
PatternMatch,
PatternPairAggregator,
)
load_dotenv(override=True)
@@ -106,16 +110,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
pattern_aggregator = PatternPairAggregator()
# Add pattern for voice switching
pattern_aggregator.add_pattern_pair(
pattern_id="voice_tag",
pattern_aggregator.add_pattern(
type="voice",
start_pattern="<voice>",
end_pattern="</voice>",
remove_match=True,
action=MatchAction.REMOVE, # Remove tags from final text
)
# Register handler for voice switching
async def on_voice_tag(match: PatternMatch):
voice_name = match.content.strip().lower()
voice_name = match.text.strip().lower()
if voice_name in VOICE_IDS:
# First flush any existing audio to finish the current context
await tts.flush_audio()
@@ -125,7 +129,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
else:
logger.warning(f"Unknown voice: {voice_name}")
pattern_aggregator.on_pattern_match("voice_tag", on_voice_tag)
pattern_aggregator.on_pattern_match("voice", on_voice_tag)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

View File

@@ -155,7 +155,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
You are a helpful LLM in a WebRTC call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to tools to search the Rijksmuseum collection.
Offer, for example, to show the earliest Rembrandt work from the museum. Use the `search_artwork` tool.
Offer, for example, to show a floral still life, use the `search_artwork` tool.
The tool may respond with a JSON object with an `artworks` array. Choose the art from that array.
Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.

View File

@@ -7,6 +7,7 @@
import asyncio
import io
import json
import os
import re
import shutil
@@ -15,7 +16,7 @@ import aiohttp
from dotenv import load_dotenv
from loguru import logger
from mcp import StdioServerParameters
from mcp.client.session_group import SseServerParameters
from mcp.client.session_group import StreamableHttpParameters
from PIL import Image
from pipecat.adapters.schemas.tools_schema import ToolsSchema
@@ -66,10 +67,12 @@ class UrlToImageProcessor(FrameProcessor):
await self.push_frame(frame, direction)
def extract_url(self, text: str):
pattern = r"!\[[^\]]*\]\((https?://[^)]+\.(png|jpg|jpeg|PNG|JPG|JPEG|gif))\)"
match = re.search(pattern, text)
if match:
return match.group(1)
data = json.loads(text)
if "artObject" in data:
return data["artObject"]["webImage"]["url"]
if "artworks" in data and len(data["artworks"]):
return data["artworks"][0]["webImage"]["url"]
return None
async def run_image_process(self, image_url: str):
@@ -132,10 +135,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
system = f"""
You are a helpful LLM in a WebRTC call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to tools to search the Rijksmuseum collection.
Offer, for example, to show the earliest Rembrandt work from the museum. Use the `search_artwork` tool.
You have access to tools to search the Rijksmuseum collection and the user's GitHub repositories and account.
Offer, for example, to show a floral still life, use the `search_artwork` tool.
The tool may respond with a JSON object with an `artworks` array. Choose the art from that array.
Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool.
You can also offer to answer users questions about their GitHub repositories and account.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
Don't overexplain what you are doing.
@@ -145,11 +149,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
messages = [{"role": "system", "content": system}]
try:
mcp = MCPClient(
rijksmuseum_mcp = MCPClient(
server_params=StdioServerParameters(
command=shutil.which("npx"),
# https://github.com/r-huijts/rijksmuseum-mcp
args=["-y", "mcp-server-error setting up mcp"],
args=["-y", "mcp-server-rijksmuseum"],
env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")},
)
)
@@ -157,24 +161,32 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.error(f"error setting up rijksmuseum mcp")
logger.exception("error trace:")
try:
# https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/
# ie. "https://www.mcp.run/api/mcp/sse?..."
# ensure the profile has a tool or few installed
mcp_run = MCPClient(server_params=SseServerParameters(url=os.getenv("MCP_RUN_SSE_URL")))
# Github MCP docs: https://github.com/github/github-mcp-server
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
github_mcp = MCPClient(
server_params=StreamableHttpParameters(
url="https://api.githubcopilot.com/mcp/",
headers={
"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"
},
)
)
except Exception as e:
logger.error(f"error setting up mcp.run")
logger.exception("error trace:")
tools = {}
run_tools = {}
rijksmuseum_tools = {}
github_tools = {}
try:
tools = await mcp.register_tools(llm)
run_tools = await mcp_run.register_tools(llm)
rijksmuseum_tools = await rijksmuseum_mcp.register_tools(llm)
github_tools = await github_mcp.register_tools(llm)
except Exception as e:
logger.error(f"error registering tools")
logger.exception("error trace:")
all_standard_tools = run_tools.standard_tools + tools.standard_tools
all_standard_tools = rijksmuseum_tools.standard_tools + github_tools.standard_tools
all_tools = ToolsSchema(standard_tools=all_standard_tools)
context = LLMContext(messages, all_tools)
@@ -226,9 +238,9 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("RIJKSMUSEUM_API_KEY") or not os.getenv("MCP_RUN_SSE_URL"):
if not os.getenv("RIJKSMUSEUM_API_KEY") or not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
logger.error(
f"Please set RIJKSMUSEUM_API_KEY and MCP_RUN_SSE_URL environment variables. See https://github.com/r-huijts/rijksmuseum-mcp and https://mcp.run"
f"Please set `RIJKSMUSEUM_API_KEY` and `GITHUB_PERSONAL_ACCESS_TOKEN` environment variables. See https://github.com/r-huijts/rijksmuseum-mcp."
)
import sys

View File

@@ -49,14 +49,14 @@ aic = [ "aic-sdk~=1.1.0" ]
anthropic = [ "anthropic~=0.49.0" ]
assemblyai = [ "pipecat-ai[websockets-base]" ]
asyncai = [ "pipecat-ai[websockets-base]" ]
aws = [ "aioboto3~=15.0.0", "pipecat-ai[websockets-base]" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.1.1; python_version>='3.12'" ]
aws = [ "aioboto3~=15.5.0", "pipecat-ai[websockets-base]" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.2.0; python_version>='3.12'" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
cerebras = []
daily = [ "daily-python~=0.22.0" ]
deepgram = [ "deepgram-sdk~=4.7.0", "pipecat-ai[websockets-base]" ]
deepseek = []
daily = [ "daily-python~=0.21.0" ]
deepgram = [ "deepgram-sdk~=4.7.0" ]
elevenlabs = [ "pipecat-ai[websockets-base]" ]
fal = [ "fal-client~=0.5.9" ]
fireworks = []
@@ -69,19 +69,21 @@ gstreamer = [ "pygobject~=3.50.0" ]
heygen = [ "livekit>=1.0.13", "pipecat-ai[websockets-base]" ]
hume = [ "hume>=0.11.2" ]
inworld = []
krisp = [ "pipecat-ai-krisp~=0.4.0" ]
koala = [ "pvkoala~=2.0.3" ]
krisp = [ "pipecat-ai-krisp~=0.4.0" ]
langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-openai~=0.3.9" ]
livekit = [ "livekit~=1.0.13", "livekit-api~=1.0.5", "tenacity>=8.2.3,<10.0.0" ]
livekit = [ "livekit~=1.0.13", "livekit-api~=1.0.5", "tenacity>=8.2.3,<10.0.0", "pyjwt>=2.10.1" ]
lmnt = [ "pipecat-ai[websockets-base]" ]
local = [ "pyaudio~=0.2.14" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ]
local-smart-turn-v3 = [ "transformers", "onnxruntime>=1.20.1,<2" ]
mcp = [ "mcp[cli]>=1.11.0,<2" ]
mem0 = [ "mem0ai~=0.1.94" ]
mistral = []
mlx-whisper = [ "mlx-whisper~=0.4.2" ]
moondream = [ "accelerate~=1.10.0", "einops~=0.8.0", "pyvips[binary]~=3.0.0", "timm~=1.0.13", "transformers>=4.48.0" ]
nim = []
neuphonic = [ "pipecat-ai[websockets-base]" ]
nim = []
noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "pipecat-ai[websockets-base]" ]
openpipe = [ "openpipe>=4.50.0,<6" ]
@@ -89,17 +91,16 @@ openrouter = []
perplexity = []
playht = [ "pipecat-ai[websockets-base]" ]
qwen = []
remote-smart-turn = []
rime = [ "pipecat-ai[websockets-base]" ]
riva = [ "nvidia-riva-client~=2.21.1" ]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.122.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"]
sagemaker = ["aws_sdk_sagemaker_runtime_http2; python_version>='3.12'"]
sambanova = []
sarvam = [ "sarvamai==0.1.21", "pipecat-ai[websockets-base]" ]
sentry = [ "sentry-sdk>=2.28.0,<3" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ]
local-smart-turn-v3 = [ "transformers", "onnxruntime>=1.20.1,<2" ]
remote-smart-turn = []
silero = [ "onnxruntime>=1.20.1,<2" ]
simli = [ "simli-ai~=0.1.25"]
simli = [ "simli-ai~=1.0.3"]
soniox = [ "pipecat-ai[websockets-base]" ]
soundfile = [ "soundfile~=0.13.1" ]
speechmatics = [ "speechmatics-rt>=0.5.0" ]

View File

@@ -30,8 +30,8 @@ EVAL_SIMPLE_MATH = EvalConfig(
)
EVAL_WEATHER = EvalConfig(
prompt="What's the weather in San Francisco?",
eval="The user says something specific about the current weather in San Francisco, including the degrees.",
prompt="What's the weather in San Francisco (in farhenheit or celsius)?",
eval="The user says something specific about the current weather in San Francisco, including the degrees (in farhenheit or celsius).",
)
EVAL_ONLINE_SEARCH = EvalConfig(
@@ -70,7 +70,7 @@ EVAL_VOICEMAIL = EvalConfig(
EVAL_CONVERSATION = EvalConfig(
prompt="Hello, this is Mark.",
eval="The user replies with a greeting.",
eval="The user acknowledges the greeting.",
eval_speaks_first=True,
)

View File

@@ -31,7 +31,11 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.llm_service import LLMService
from pipecat.utils.text.pattern_pair_aggregator import PatternMatch, PatternPairAggregator
from pipecat.utils.text.pattern_pair_aggregator import (
MatchAction,
PatternMatch,
PatternPairAggregator,
)
class IVRStatus(Enum):
@@ -114,15 +118,15 @@ class IVRProcessor(FrameProcessor):
def _setup_xml_patterns(self):
"""Set up XML pattern detection and handlers."""
# Register DTMF pattern
self._aggregator.add_pattern_pair("dtmf", "<dtmf>", "</dtmf>", remove_match=True)
self._aggregator.add_pattern("dtmf", "<dtmf>", "</dtmf>", action=MatchAction.REMOVE)
self._aggregator.on_pattern_match("dtmf", self._handle_dtmf_action)
# Register mode pattern
self._aggregator.add_pattern_pair("mode", "<mode>", "</mode>", remove_match=True)
self._aggregator.add_pattern("mode", "<mode>", "</mode>", action=MatchAction.REMOVE)
self._aggregator.on_pattern_match("mode", self._handle_mode_action)
# Register IVR pattern
self._aggregator.add_pattern_pair("ivr", "<ivr>", "</ivr>", remove_match=True)
self._aggregator.add_pattern("ivr", "<ivr>", "</ivr>", action=MatchAction.REMOVE)
self._aggregator.on_pattern_match("ivr", self._handle_ivr_action)
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -148,7 +152,7 @@ class IVRProcessor(FrameProcessor):
result = await self._aggregator.aggregate(frame.text)
if result:
# Push aggregated text that doesn't contain XML patterns
await self.push_frame(LLMTextFrame(result), direction)
await self.push_frame(LLMTextFrame(result.text), direction)
else:
await self.push_frame(frame, direction)
@@ -159,7 +163,7 @@ class IVRProcessor(FrameProcessor):
Args:
match: The pattern match containing DTMF content.
"""
value = match.content
value = match.text
logger.debug(f"DTMF detected: {value}")
try:
@@ -180,7 +184,7 @@ class IVRProcessor(FrameProcessor):
Args:
match: The pattern match containing IVR status content.
"""
status = match.content
status = match.text
logger.trace(f"IVR status detected: {status}")
# Convert string to enum, with validation
@@ -211,7 +215,7 @@ class IVRProcessor(FrameProcessor):
Args:
match: The pattern match containing mode content.
"""
mode = match.content
mode = match.text
logger.debug(f"Mode detected: {mode}")
if mode == "conversation":
await self._handle_conversation()

View File

@@ -12,6 +12,7 @@ and LLM processing.
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
@@ -326,22 +327,21 @@ class TextFrame(DataFrame):
Parameters:
text: The text content.
skip_tts: Whether this text should skip TTS processing.
"""
text: str
skip_tts: bool = field(init=False)
skip_tts: bool = field(default=False, kw_only=True)
# Whether any necessary inter-frame (leading/trailing) spaces are already
# included in the text.
# NOTE: Ideally this would be available at init time with a default value,
# but that would impact how subclasses can be initialized (it would require
# mandatory fields of theirs to have defaults to preserve
# non-default-before-default argument order)
includes_inter_frame_spaces: bool = field(init=False)
# Whether this text frame should be appended to the LLM context.
append_to_context: bool = field(init=False)
def __post_init__(self):
super().__post_init__()
self.skip_tts = False
self.includes_inter_frame_spaces = False
self.append_to_context = True
def __str__(self):
pts = format_pts(self.pts)
@@ -352,11 +352,38 @@ class TextFrame(DataFrame):
class LLMTextFrame(TextFrame):
"""Text frame generated by LLM services."""
pass
def __post_init__(self):
super().__post_init__()
# LLM services send text frames with all necessary spaces included
self.includes_inter_frame_spaces = True
class AggregationType(str, Enum):
"""Built-in aggregation strings."""
SENTENCE = "sentence"
WORD = "word"
def __str__(self):
return self.value
@dataclass
class TTSTextFrame(TextFrame):
class AggregatedTextFrame(TextFrame):
"""Text frame representing an aggregation of TextFrames.
This frame contains multiple TextFrames aggregated together for processing
or output along with a field to indicate how they are aggregated.
Parameters:
aggregated_by: Method used to aggregate the text frames.
"""
aggregated_by: AggregationType | str
@dataclass
class TTSTextFrame(AggregatedTextFrame):
"""Text frame generated by Text-to-Speech services."""
pass
@@ -804,11 +831,13 @@ class ErrorFrame(SystemFrame):
error: Description of the error that occurred.
fatal: Whether the error is fatal and requires bot shutdown.
processor: The frame processor that generated the error.
exception: The exception that occurred.
"""
error: str
fatal: bool = False
processor: Optional["FrameProcessor"] = None
exception: Optional[Exception] = None
def __str__(self):
return f"{self.name}(error: {self.error}, fatal: {self.fatal})"
@@ -1597,24 +1626,23 @@ class LLMFullResponseStartFrame(ControlFrame):
Used to indicate the beginning of an LLM response. Followed by one or
more TextFrames and a final LLMFullResponseEndFrame.
Parameters:
skip_tts: Whether LLM output should skip TTS processing.
"""
skip_tts: bool = field(init=False)
def __post_init__(self):
super().__post_init__()
self.skip_tts = False
skip_tts: bool = field(default=False, kw_only=True)
@dataclass
class LLMFullResponseEndFrame(ControlFrame):
"""Frame indicating the end of an LLM response."""
"""Frame indicating the end of an LLM response.
skip_tts: bool = field(init=False)
Parameters:
skip_tts: Whether LLM output should skip TTS processing.
"""
def __post_init__(self):
super().__post_init__()
self.skip_tts = False
skip_tts: bool = field(default=False, kw_only=True)
@dataclass

View File

@@ -14,6 +14,7 @@ translation from this universal context into whatever format it needs, using a
service-specific adapter.
"""
import asyncio
import base64
import io
import wave
@@ -137,7 +138,7 @@ class LLMContext:
return {"role": role, "content": content}
@staticmethod
def create_image_message(
async def create_image_message(
*,
role: str = "user",
format: str,
@@ -154,15 +155,21 @@ class LLMContext:
image: Raw image bytes.
text: Optional text to include with the image.
"""
buffer = io.BytesIO()
Image.frombytes(format, size, image).save(buffer, format="JPEG")
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
def encode_image():
buffer = io.BytesIO()
Image.frombytes(format, size, image).save(buffer, format="JPEG")
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
return encoded_image
encoded_image = await asyncio.to_thread(encode_image)
url = f"data:image/jpeg;base64,{encoded_image}"
return LLMContext.create_image_url_message(role=role, url=url, text=text)
@staticmethod
def create_audio_message(
async def create_audio_message(
*, role: str = "user", audio_frames: list[AudioRawFrame], text: str = "Audio follows"
) -> LLMContextMessage:
"""Create a context message containing audio.
@@ -172,21 +179,26 @@ class LLMContext:
audio_frames: List of audio frame objects to include.
text: Optional text to include with the audio.
"""
sample_rate = audio_frames[0].sample_rate
num_channels = audio_frames[0].num_channels
content = []
content.append({"type": "text", "text": text})
data = b"".join(frame.audio for frame in audio_frames)
async def encode_audio():
sample_rate = audio_frames[0].sample_rate
num_channels = audio_frames[0].num_channels
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(data)
content = []
content.append({"type": "text", "text": text})
data = b"".join(frame.audio for frame in audio_frames)
encoded_audio = base64.b64encode(buffer.getvalue()).decode("utf-8")
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(data)
encoded_audio = base64.b64encode(buffer.getvalue()).decode("utf-8")
return encoded_audio
encoded_audio = await asyncio.to_thread(encode_audio)
content.append(
{
@@ -321,7 +333,7 @@ class LLMContext:
"""
self._tool_choice = tool_choice
def add_image_frame_message(
async def add_image_frame_message(
self, *, format: str, size: tuple[int, int], image: bytes, text: Optional[str] = None
):
"""Add a message containing an image frame.
@@ -332,10 +344,12 @@ class LLMContext:
image: Raw image bytes.
text: Optional text to include with the image.
"""
message = LLMContext.create_image_message(format=format, size=size, image=image, text=text)
message = await LLMContext.create_image_message(
format=format, size=size, image=image, text=text
)
self.add_message(message)
def add_audio_frames_message(
async def add_audio_frames_message(
self, *, audio_frames: list[AudioRawFrame], text: str = "Audio follows"
):
"""Add a message containing audio frames.
@@ -344,7 +358,7 @@ class LLMContext:
audio_frames: List of audio frame objects to include.
text: Optional text to include with the audio.
"""
message = LLMContext.create_audio_message(audio_frames=audio_frames, text=text)
message = await LLMContext.create_audio_message(audio_frames=audio_frames, text=text)
self.add_message(message)
@staticmethod

View File

@@ -1001,7 +1001,7 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
await self.push_aggregation()
async def _handle_text(self, frame: TextFrame):
if not self._started:
if not self._started or not frame.append_to_context:
return
if self._params.expect_stripped_words:

View File

@@ -66,7 +66,7 @@ from pipecat.processors.aggregators.llm_response import (
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.string import concatenate_aggregated_text
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
from pipecat.utils.time import time_now_iso8601
@@ -90,15 +90,7 @@ class LLMContextAggregator(FrameProcessor):
self._context = context
self._role = role
self._aggregation: List[str] = []
# Whether to add spaces between text parts.
# (Currently only used by LLMAssistantAggregator, but could be expanded
# to LLMUserAggregator in the future if needed; that would require
# additional work since LLMUserAggregator currently trims spaces from
# incoming frames before determining whether it "really" received any
# text).
self._add_spaces = True
self._aggregation: List[TextPartForConcatenation] = []
@property
def messages(self) -> List[LLMContextMessage]:
@@ -191,7 +183,7 @@ class LLMContextAggregator(FrameProcessor):
Returns:
The concatenated aggregation string.
"""
return concatenate_aggregated_text(self._aggregation, self._add_spaces)
return concatenate_aggregated_text(self._aggregation)
class LLMUserAggregator(LLMContextAggregator):
@@ -441,7 +433,12 @@ class LLMUserAggregator(LLMContextAggregator):
if not text.strip():
return
self._aggregation.append(text)
# Transcriptions never include inter-part spaces (so far).
self._aggregation.append(
TextPartForConcatenation(
text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
# We just got a final result, so let's reset interim results.
self._seen_interim_results = False
# Reset aggregation timer.
@@ -796,7 +793,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
logger.debug(f"{self} Appending UserImageRawFrame to LLM context (size: {frame.size})")
self._context.add_image_frame_message(
await self._context.add_image_frame_message(
format=frame.format,
size=frame.size,
image=frame.image,
@@ -814,18 +811,18 @@ class LLMAssistantAggregator(LLMContextAggregator):
await self.push_aggregation()
async def _handle_text(self, frame: TextFrame):
if not self._started:
if not self._started or not frame.append_to_context:
return
# Make sure we really have text (spaces count, too!)
if len(frame.text) == 0:
return
# Track whether we need to add spaces between text parts
# Assumption: we can just keep track of the latest frame's value
self._add_spaces = not frame.includes_inter_frame_spaces
self._aggregation.append(frame.text)
self._aggregation.append(
TextPartForConcatenation(
frame.text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
def _context_updated_task_finished(self, task: asyncio.Task):
self._context_updated_tasks.discard(task)

View File

@@ -0,0 +1,106 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""LLM text processor module for processing and aggregating raw LLM output text.
This processor will convert LLMTextFrames into AggregatedTextFrames based on the
configured text aggregator. Using the customizable aggregator, it provides
functionality to handle or manipulate LLM text frames before they are sent to other
components such as TTS services or context aggregators. It can be used to pre-aggregate
and categorize, modify, or filter direct output tokens from the LLM.
"""
from typing import Optional
from pipecat.frames.frames import (
AggregatedTextFrame,
EndFrame,
Frame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMTextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator
from pipecat.utils.text.simple_text_aggregator import SimpleTextAggregator
class LLMTextProcessor(FrameProcessor):
"""A processor for handling or manipulating LLM text frames before they are processed further.
This processor will convert LLMTextFrames into AggregatedTextFrames based on the configured
text aggregator. Using the customizable aggregator, it provides functionality to handle or
manipulate LLM text frames before they are sent to other components such as TTS services or
context aggregators. It can be used to pre-aggregate and categorize, modify, or filter direct
output tokens from the LLM.
"""
def __init__(self, *, text_aggregator: Optional[BaseTextAggregator] = None, **kwargs):
"""Initialize the LLM text processor.
Args:
text_aggregator: An optional text aggregator to use for processing LLM text frames. By
default, a SimpleTextAggregator aggregating by sentence will be used.
**kwargs: Additional arguments passed to parent class.
TODO: Allow transformations per aggregation type or all (and deprecate the TTS filters).
"""
super().__init__(**kwargs)
self._text_aggregator: BaseTextAggregator = text_aggregator or SimpleTextAggregator()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process an LLMTextFrames using the aggregator to generate AggregatedTextFrames.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, InterruptionFrame):
await self._handle_interruption(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, LLMTextFrame):
await self._handle_llm_text(frame)
elif isinstance(frame, LLMFullResponseEndFrame):
await self._handle_llm_end(frame.skip_tts)
await self.push_frame(frame, direction)
elif isinstance(frame, EndFrame):
await self._handle_llm_end()
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
async def _handle_interruption(self, _):
"""Handle interruptions by resetting the text aggregator."""
await self._text_aggregator.handle_interruption()
async def reset(self):
"""Reset the internal state of the text processor and its aggregator."""
await self._text_aggregator.reset()
async def _handle_llm_text(self, in_frame: LLMTextFrame):
aggregation = await self._text_aggregator.aggregate(in_frame.text)
if aggregation:
out_frame = AggregatedTextFrame(
text=aggregation.text,
aggregated_by=aggregation.type,
)
out_frame.skip_tts = in_frame.skip_tts
await self.push_frame(out_frame)
async def _handle_llm_end(self, skip_tts: bool = False):
# Flush any remaining aggregated text at the end of the LLM response
aggregation = self._text_aggregator.text
await self._text_aggregator.reset()
text = aggregation.text.strip()
if text:
out_frame = AggregatedTextFrame(
text=text,
aggregated_by=aggregation.type,
)
out_frame.skip_tts = skip_tts
await self.push_frame(out_frame)

View File

@@ -83,4 +83,4 @@ class ConsumerProcessor(FrameProcessor):
while True:
frame = await self._queue.get()
new_frame = await self._transformer(frame)
await self.push_frame(new_frame, self._direction)
await self.queue_frame(new_frame, self._direction)

View File

@@ -126,6 +126,4 @@ class WakeCheckFilter(FrameProcessor):
else:
await self.push_frame(frame, direction)
except Exception as e:
error_msg = f"Error in wake word filter: {e}"
logger.exception(error_msg)
await self.push_error(ErrorFrame(error_msg))
await self.push_error(error_msg=f"Error in wake word filter: {e}", exception=e)

View File

@@ -142,6 +142,7 @@ class FrameProcessor(BaseObject):
- on_after_process_frame: Called after a frame is processed
- on_before_push_frame: Called before a frame is pushed
- on_after_push_frame: Called after a frame is pushed
- on_error: Called when an error is raised in the frame processing.
"""
def __init__(
@@ -234,6 +235,7 @@ class FrameProcessor(BaseObject):
self._register_event_handler("on_after_process_frame", sync=True)
self._register_event_handler("on_before_push_frame", sync=True)
self._register_event_handler("on_after_push_frame", sync=True)
self._register_event_handler("on_error", sync=True)
@property
def id(self) -> int:
@@ -630,7 +632,43 @@ class FrameProcessor(BaseObject):
elif isinstance(frame, (FrameProcessorResumeFrame, FrameProcessorResumeUrgentFrame)):
await self.__resume(frame)
async def push_error(self, error: ErrorFrame):
async def push_error(
self,
error_msg: str,
exception: Optional[Exception] = None,
fatal: bool = False,
):
"""Creates and pushes an ErrorFrame upstream.
Creates and pushes an ErrorFrame upstream to notify other processors in the
pipeline about an error condition. The error frame will include context about
which processor generated the error.
Args:
error_msg: Descriptive message explaining the error condition.
exception: Optional exception object that caused the error, if available.
This provides additional context for debugging and error handling.
fatal: Whether this error should be considered fatal to the pipeline.
Fatal errors typically cause the entire pipeline to stop processing.
Defaults to False for non-fatal errors.
Example::
```python
# Non-fatal error
await self.push_error("Failed to process audio chunk, skipping")
# Fatal error with exception context
try:
result = some_critical_operation()
except Exception as e:
await self.push_error("Critical operation failed", exception=e, fatal=True)
```
"""
error_frame = ErrorFrame(error=error_msg, fatal=fatal, exception=exception, processor=self)
await self.push_error_frame(error=error_frame)
async def push_error_frame(self, error: ErrorFrame):
"""Push an error frame upstream.
Args:
@@ -638,6 +676,8 @@ class FrameProcessor(BaseObject):
"""
if not error.processor:
error.processor = self
await self._call_event_handler("on_error", error)
logger.error(f"{error.processor} error: {error.error}")
await self.push_frame(error, FrameDirection.UPSTREAM)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
@@ -759,8 +799,10 @@ class FrameProcessor(BaseObject):
await self.__cancel_process_task()
self.__create_process_task()
except Exception as e:
logger.exception(f"Uncaught exception in {self} when handling _start_interruption: {e}")
await self.push_error(ErrorFrame(str(e)))
await self.push_error(
error_msg=f"Uncaught exception handling _start_interruption: {e}",
exception=e,
)
async def __internal_push_frame(self, frame: Frame, direction: FrameDirection):
"""Internal method to push frames to adjacent processors.
@@ -797,8 +839,7 @@ class FrameProcessor(BaseObject):
await self._observer.on_push_frame(data)
await self._prev.queue_frame(frame, direction)
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
await self.push_error(ErrorFrame(str(e)))
await self.push_error(error_msg=f"Uncaught exception: {e}", exception=e)
def _check_started(self, frame: Frame):
"""Check if the processor has been started.
@@ -874,8 +915,7 @@ class FrameProcessor(BaseObject):
await self._call_event_handler("on_after_process_frame", frame)
except Exception as e:
logger.exception(f"{self}: error processing frame: {e}")
await self.push_error(ErrorFrame(str(e)))
await self.push_error(error_msg=f"Error processing frame: {e}", exception=e)
async def __input_frame_task_handler(self):
"""Handle frames from the input queue.

View File

@@ -24,7 +24,7 @@ try:
from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import Runnable
except ModuleNotFoundError as e:
logger.exception("In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. ")
logger.error("In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. ")
raise Exception(f"Missing module: {e}")
@@ -113,6 +113,6 @@ class LangchainProcessor(FrameProcessor):
except GeneratorExit:
logger.warning(f"{self} generator was closed prematurely")
except Exception as e:
logger.exception(f"{self} an unknown error occurred: {e}")
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
await self.push_frame(LLMFullResponseEndFrame())

View File

@@ -24,6 +24,7 @@ from typing import (
Literal,
Mapping,
Optional,
Tuple,
Union,
)
@@ -32,6 +33,8 @@ from pydantic import BaseModel, Field, PrivateAttr, ValidationError
from pipecat.audio.utils import calculate_audio_volume
from pipecat.frames.frames import (
AggregatedTextFrame,
AggregationType,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
@@ -704,6 +707,29 @@ class RTVITextMessageData(BaseModel):
text: str
class RTVIBotOutputMessageData(RTVITextMessageData):
"""Data for bot output RTVI messages.
Extends RTVITextMessageData to include metadata about the output.
"""
spoken: bool = False # Indicates if the text has been spoken by TTS
aggregated_by: AggregationType | str
# Indicates what form the text is in (e.g., by word, sentence, etc.)
class RTVIBotOutputMessage(BaseModel):
"""Message containing bot output text.
An event meant to holistically represent what the bot is outputting,
along with metadata about the output and if it has been spoken.
"""
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
type: Literal["bot-output"] = "bot-output"
data: RTVIBotOutputMessageData
class RTVIBotTranscriptionMessage(BaseModel):
"""Message containing bot transcription text.
@@ -896,6 +922,7 @@ class RTVIObserverParams:
Parameter `errors_enabled` is deprecated. Error messages are always enabled.
Parameters:
bot_output_enabled: Indicates if bot output messages should be sent.
bot_llm_enabled: Indicates if the bot's LLM messages should be sent.
bot_tts_enabled: Indicates if the bot's TTS messages should be sent.
bot_speaking_enabled: Indicates if the bot's started/stopped speaking messages should be sent.
@@ -907,9 +934,17 @@ class RTVIObserverParams:
metrics_enabled: Indicates if metrics messages should be sent.
system_logs_enabled: Indicates if system logs should be sent.
errors_enabled: [Deprecated] Indicates if errors messages should be sent.
skip_aggregator_types: List of aggregation types to skip sending as tts/output messages.
Note: if using this to avoid sending secure information, be sure to also disable
bot_llm_enabled to avoid leaking through LLM messages.
bot_output_transforms: A list of callables to transform text before just before sending it
to TTS. Each callable takes the aggregated text and its type, and returns the
transformed text. To register, provide a list of tuples of
(aggregation_type | '*', transform_function).
audio_level_period_secs: How often audio levels should be sent if enabled.
"""
bot_output_enabled: bool = True
bot_llm_enabled: bool = True
bot_tts_enabled: bool = True
bot_speaking_enabled: bool = True
@@ -921,6 +956,15 @@ class RTVIObserverParams:
metrics_enabled: bool = True
system_logs_enabled: bool = False
errors_enabled: Optional[bool] = None
skip_aggregator_types: Optional[List[AggregationType | str]] = None
bot_output_transforms: Optional[
List[
Tuple[
AggregationType | str,
Callable[[str, AggregationType | str], Awaitable[str]],
]
]
] = None
audio_level_period_secs: float = 0.15
@@ -973,8 +1017,45 @@ class RTVIObserver(BaseObserver):
DeprecationWarning,
)
self._aggregation_transforms: List[
Tuple[AggregationType | str, Callable[[str, AggregationType | str], Awaitable[str]]]
] = self._params.bot_output_transforms or []
def add_bot_output_transformer(
self,
transform_function: Callable[[str, AggregationType | str], Awaitable[str]],
aggregation_type: AggregationType | str = "*",
):
"""Transform text for a specific aggregation type before sending as Bot Output or TTS.
Args:
transform_function: The function to apply for transformation. This function should take
the text and aggregation type as input and return the transformed text.
Ex.: async def my_transform(text: str, aggregation_type: str) -> str:
aggregation_type: The type of aggregation to transform. This value defaults to "*" to
handle all text before sending to the client.
"""
self._aggregation_transforms.append((aggregation_type, transform_function))
def remove_bot_output_transformer(
self,
transform_function: Callable[[str, AggregationType | str], Awaitable[str]],
aggregation_type: AggregationType | str = "*",
):
"""Remove a text transformer for a specific aggregation type.
Args:
transform_function: The function to remove.
aggregation_type: The type of aggregation to remove the transformer for.
"""
self._aggregation_transforms = [
(agg_type, func)
for agg_type, func in self._aggregation_transforms
if not (agg_type == aggregation_type and func == transform_function)
]
async def _logger_sink(self, message):
"""Logger sink so we cna send system logs to RTVI clients."""
"""Logger sink so we can send system logs to RTVI clients."""
message = RTVISystemLogMessage(data=RTVITextMessageData(text=message))
await self.send_rtvi_message(message)
@@ -1048,12 +1129,15 @@ class RTVIObserver(BaseObserver):
await self.send_rtvi_message(RTVIBotTTSStartedMessage())
elif isinstance(frame, TTSStoppedFrame) and self._params.bot_tts_enabled:
await self.send_rtvi_message(RTVIBotTTSStoppedMessage())
elif isinstance(frame, TTSTextFrame) and self._params.bot_tts_enabled:
if isinstance(src, BaseOutputTransport):
message = RTVIBotTTSTextMessage(data=RTVITextMessageData(text=frame.text))
await self.send_rtvi_message(message)
else:
elif isinstance(frame, AggregatedTextFrame) and (
self._params.bot_output_enabled or self._params.bot_tts_enabled
):
if isinstance(frame, TTSTextFrame) and not isinstance(src, BaseOutputTransport):
# This check is to make sure we handle the frame when it has gone
# through the transport and has correct timing.
mark_as_seen = False
else:
await self._handle_aggregated_llm_text(frame)
elif isinstance(frame, MetricsFrame) and self._params.metrics_enabled:
await self._handle_metrics(frame)
elif isinstance(frame, RTVIServerMessageFrame):
@@ -1084,15 +1168,6 @@ class RTVIObserver(BaseObserver):
if mark_as_seen:
self._frames_seen.add(frame.id)
async def _push_bot_transcription(self):
"""Push accumulated bot transcription as a message."""
if len(self._bot_transcription) > 0:
message = RTVIBotTranscriptionMessage(
data=RTVITextMessageData(text=self._bot_transcription)
)
await self.send_rtvi_message(message)
self._bot_transcription = ""
async def _handle_interruptions(self, frame: Frame):
"""Handle user speaking interruption frames."""
message = None
@@ -1115,14 +1190,45 @@ class RTVIObserver(BaseObserver):
if message:
await self.send_rtvi_message(message)
async def _handle_aggregated_llm_text(self, frame: AggregatedTextFrame):
"""Handle aggregated LLM text output frames."""
# Skip certain aggregator types if configured to do so.
if (
self._params.skip_aggregator_types
and frame.aggregated_by in self._params.skip_aggregator_types
):
return
text = frame.text
type = frame.aggregated_by
for aggregation_type, transform in self._aggregation_transforms:
if aggregation_type == type or aggregation_type == "*":
text = await transform(text, type)
isTTS = isinstance(frame, TTSTextFrame)
if self._params.bot_output_enabled:
message = RTVIBotOutputMessage(
data=RTVIBotOutputMessageData(text=text, spoken=isTTS, aggregated_by=type)
)
await self.send_rtvi_message(message)
if isTTS and self._params.bot_tts_enabled:
tts_message = RTVIBotTTSTextMessage(data=RTVITextMessageData(text=text))
await self.send_rtvi_message(tts_message)
async def _handle_llm_text_frame(self, frame: LLMTextFrame):
"""Handle LLM text output frames."""
message = RTVIBotLLMTextMessage(data=RTVITextMessageData(text=frame.text))
await self.send_rtvi_message(message)
# TODO (mrkb): Remove all this logic when we fully deprecate bot-transcription messages.
self._bot_transcription += frame.text
if match_endofsentence(self._bot_transcription):
await self._push_bot_transcription()
if match_endofsentence(self._bot_transcription) and len(self._bot_transcription) > 0:
await self.send_rtvi_message(
RTVIBotTranscriptionMessage(data=RTVITextMessageData(text=self._bot_transcription))
)
self._bot_transcription = ""
async def _handle_user_transcriptions(self, frame: Frame):
"""Handle user transcription frames."""
@@ -1248,7 +1354,7 @@ class RTVIProcessor(FrameProcessor):
# Default to 0.3.0 which is the last version before actually having a
# "client-version".
self._client_version = [0, 3, 0]
self._skip_tts: bool = False # Keep in sync with llm_service.py
self._llm_skip_tts: bool = False # Keep in sync with llm_service.py's configuration.
self._registered_actions: Dict[str, RTVIAction] = {}
self._registered_services: Dict[str, RTVIService] = {}
@@ -1441,7 +1547,7 @@ class RTVIProcessor(FrameProcessor):
elif isinstance(frame, RTVIActionFrame):
await self._action_queue.put(frame)
elif isinstance(frame, LLMConfigureOutputFrame):
self._skip_tts = frame.skip_tts
self._llm_skip_tts = frame.skip_tts
await self.push_frame(frame, direction)
# Other frames
else:
@@ -1697,9 +1803,9 @@ class RTVIProcessor(FrameProcessor):
opts = data.options if data.options is not None else RTVISendTextOptions()
if opts.run_immediately:
await self.interrupt_bot()
cur_skip_tts = self._skip_tts
cur_llm_skip_tts = self._llm_skip_tts
should_skip_tts = not opts.audio_response
toggle_skip_tts = cur_skip_tts != should_skip_tts
toggle_skip_tts = cur_llm_skip_tts != should_skip_tts
if toggle_skip_tts:
output_frame = LLMConfigureOutputFrame(skip_tts=should_skip_tts)
await self.push_frame(output_frame)
@@ -1709,7 +1815,7 @@ class RTVIProcessor(FrameProcessor):
)
await self.push_frame(text_frame)
if toggle_skip_tts:
output_frame = LLMConfigureOutputFrame(skip_tts=cur_skip_tts)
output_frame = LLMConfigureOutputFrame(skip_tts=cur_llm_skip_tts)
await self.push_frame(output_frame)
async def _handle_update_context(self, data: RTVIAppendToContextData):

View File

@@ -23,7 +23,7 @@ try:
from strands import Agent
from strands.multiagent.graph import Graph
except ModuleNotFoundError as e:
logger.exception("In order to use Strands Agents, you need to `pip install strands-agents`.")
logger.error("In order to use Strands Agents, you need to `pip install strands-agents`.")
raise Exception(f"Missing module: {e}")
@@ -143,7 +143,7 @@ class StrandsAgentsProcessor(FrameProcessor):
except GeneratorExit:
logger.warning(f"{self} generator was closed prematurely")
except Exception as e:
logger.exception(f"{self} an unknown error occurred: {e}")
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
if ttfb_tracking:
await self.stop_ttfb_metrics()

View File

@@ -26,7 +26,7 @@ from pipecat.frames.frames import (
TTSTextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.string import concatenate_aggregated_text
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
from pipecat.utils.time import time_now_iso8601
@@ -98,15 +98,9 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._current_text_parts: List[str] = []
self._current_text_parts: List[TextPartForConcatenation] = []
self._aggregation_start_time: Optional[str] = None
# Whether to add spaces between text parts.
# (The use of this could be expanded to the UserTranscriptProcessor in
# the future if needed; currently the UserTranscriptProcessor assumes
# that user transcription frames do not need aggregation).
self._add_spaces = True
async def _emit_aggregated_text(self):
"""Aggregates and emits text fragments as a transcript message.
@@ -147,7 +141,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
Result: "Hello there how are you"
"""
if self._current_text_parts and self._aggregation_start_time:
content = concatenate_aggregated_text(self._current_text_parts, self._add_spaces)
content = concatenate_aggregated_text(self._current_text_parts)
if content:
logger.trace(f"Emitting aggregated assistant message: {content}")
message = TranscriptionMessage(
@@ -191,11 +185,11 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
if not self._aggregation_start_time:
self._aggregation_start_time = time_now_iso8601()
# Track whether we need to add spaces between text parts
# Assumption: we can just keep track of the latest frame's value
self._add_spaces = not frame.includes_inter_frame_spaces
self._current_text_parts.append(frame.text)
self._current_text_parts.append(
TextPartForConcatenation(
frame.text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
# Push frame.
await self.push_frame(frame, direction)

View File

@@ -264,7 +264,10 @@ def _setup_webrtc_routes(
# Prepare runner arguments with the callback to run your bot
async def webrtc_connection_callback(connection):
bot_module = _get_bot_module()
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection)
runner_args = SmallWebRTCRunnerArguments(
webrtc_connection=connection, body=request.request_data
)
background_tasks.add_task(bot_module.bot, runner_args)
# Delegate handling to SmallWebRTCRequestHandler
@@ -326,7 +329,8 @@ def _setup_webrtc_routes(
type=request_data["type"],
pc_id=request_data.get("pc_id"),
restart_pc=request_data.get("restart_pc"),
request_data=request_data,
request_data=request_data.get("request_data")
or request_data.get("requestData"),
)
return await offer(webrtc_request, background_tasks)
elif request.method == HTTPMethod.PATCH.value:

View File

@@ -281,6 +281,14 @@ async def maybe_capture_participant_camera(
except ImportError:
pass
try:
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
if isinstance(transport, SmallWebRTCTransport):
await transport.capture_participant_video(video_source="camera")
except ImportError:
pass
async def maybe_capture_participant_screen(
transport: BaseTransport, client: Any, framerate: int = 0
@@ -303,6 +311,14 @@ async def maybe_capture_participant_screen(
except ImportError:
pass
try:
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
if isinstance(transport, SmallWebRTCTransport):
await transport.capture_participant_video(video_source="screenVideo")
except ImportError:
pass
def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str:
"""Clean up ICE candidates in SDP text for SmallWebRTC.

View File

@@ -199,7 +199,7 @@ class PlivoFrameSerializer(FrameSerializer):
)
except Exception as e:
logger.exception(f"Failed to hang up Plivo call: {e}")
logger.error(f"Failed to hang up Plivo call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Plivo WebSocket data to Pipecat frames.

View File

@@ -225,7 +225,7 @@ class TelnyxFrameSerializer(FrameSerializer):
)
except Exception as e:
logger.exception(f"Failed to hang up Telnyx call: {e}")
logger.error(f"Failed to hang up Telnyx call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Telnyx WebSocket data to Pipecat frames.

View File

@@ -236,7 +236,7 @@ class TwilioFrameSerializer(FrameSerializer):
)
except Exception as e:
logger.exception(f"Failed to hang up Twilio call: {e}")
logger.error(f"Failed to hang up Twilio call: {e}")
async def deserialize(self, data: str | bytes) -> Frame | None:
"""Deserializes Twilio WebSocket data to Pipecat frames.

View File

@@ -166,6 +166,6 @@ class AIService(FrameProcessor):
async for f in generator:
if f:
if isinstance(f, ErrorFrame):
await self.push_error(f)
await self.push_error_frame(f)
else:
await self.push_frame(f)

View File

@@ -327,7 +327,7 @@ class AnthropicLLMService(LLMService):
cache_read_input_tokens = 0
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.start_processing_metrics()
params_from_context = self._get_llm_invocation_params(context)
@@ -373,9 +373,9 @@ class AnthropicLLMService(LLMService):
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
frame = LLMTextFrame(event.delta.text)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self.push_frame(
LLMTextFrame(event.delta.text, skip_tts=self._get_skip_tts())
)
completion_tokens_estimate += self._estimate_tokens(event.delta.text)
elif hasattr(event.delta, "partial_json") and tool_use_block:
json_accumulator += event.delta.partial_json
@@ -460,11 +460,10 @@ class AnthropicLLMService(LLMService):
except httpx.TimeoutException:
await self._call_event_handler("on_completion_timeout")
except Exception as e:
logger.exception(f"{self} exception: {e}")
await self.push_error(ErrorFrame(f"{e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
comp_tokens = (
completion_tokens
if not use_completion_tokens_estimate

View File

@@ -206,9 +206,8 @@ class AssemblyAISTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
self._connected = False
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
raise
async def _disconnect(self):
@@ -233,8 +232,7 @@ class AssemblyAISTTService(STTService):
logger.warning("Timed out waiting for termination message from server")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
if self._receive_task:
await self.cancel_task(self._receive_task)
@@ -242,8 +240,7 @@ class AssemblyAISTTService(STTService):
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._websocket = None
@@ -262,13 +259,11 @@ class AssemblyAISTTService(STTService):
except websockets.exceptions.ConnectionClosedOK:
break
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
break
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
def _parse_message(self, message: Dict[str, Any]) -> BaseMessage:
"""Parse a raw message into the appropriate message type."""
@@ -297,8 +292,7 @@ class AssemblyAISTTService(STTService):
elif isinstance(parsed_message, TerminationMessage):
await self._handle_termination(parsed_message)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
async def _handle_termination(self, message: TerminationMessage):
"""Handle termination message."""

View File

@@ -146,15 +146,6 @@ class AsyncAITTSService(InterruptibleTTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that AsyncAI TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that AsyncAI's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Async language format.
@@ -237,8 +228,7 @@ class AsyncAITTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -250,8 +240,7 @@ class AsyncAITTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Async")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._websocket = None
self._started = False
@@ -296,12 +285,11 @@ class AsyncAITTSService(InterruptibleTTSService):
)
await self.push_frame(frame)
elif msg.get("error_code"):
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
await self.push_error(error_msg=f"Error: {msg['message']}")
else:
logger.error(f"{self} error, unknown message type: {msg}")
await self.push_error(error_msg=f"Unknown message type: {msg}")
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to maintain WebSocket connection."""
@@ -344,16 +332,14 @@ class AsyncAITTSService(InterruptibleTTSService):
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
class AsyncAIHttpTTSService(TTSService):
@@ -433,15 +419,6 @@ class AsyncAIHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that AsyncAI TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that AsyncAI's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Async language format.
@@ -495,8 +472,7 @@ class AsyncAIHttpTTSService(TTSService):
async with self._session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Async API error: {error_text}")
await self.push_error(ErrorFrame(error=f"Async API error: {error_text}"))
await self.push_error(error_msg=f"Async API error: {error_text}")
raise Exception(f"Async API returned status {response.status}: {error_text}")
audio_data = await response.read()
@@ -512,8 +488,7 @@ class AsyncAIHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -8,8 +8,10 @@ import sys
from pipecat.services import DeprecatedModuleProxy
from .agent_core import *
from .llm import *
from .nova_sonic import *
from .sagemaker import *
from .stt import *
from .tts import *

View File

@@ -0,0 +1,258 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""AWS AgentCore Processor Module.
This module defines the AWSAgentCoreProcessor, which invokes agents hosted on
Amazon Bedrock AgentCore Runtime and streams their responses as LLMTextFrames.
"""
import asyncio
import json
import os
from typing import Callable, Optional
import aioboto3
from loguru import logger
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMTextFrame,
)
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
OpenAILLMContextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
def default_context_to_payload_transformer(
context: LLMContext | OpenAILLMContext,
) -> Optional[str]:
"""Default transformer to create AgentCore payload from LLM context.
Extracts the latest user or system message text and wraps it in {"prompt": "<text>"}.
Args:
context: The LLM context containing conversation messages.
Returns:
A JSON string payload for AgentCore, or None if no valid message found.
"""
messages = context.messages
if not messages:
return None
last_message = messages[-1]
if isinstance(last_message, LLMSpecificMessage) or last_message.get("role") not in (
"user",
"system",
):
return None
content = last_message.get("content")
if not content:
return None
if isinstance(content, str):
prompt = content
elif isinstance(content, list):
prompt = " ".join([part.get("text", "") for part in content])
else:
return None
return json.dumps({"prompt": prompt})
def default_response_to_output_transformer(response_line: str) -> Optional[str]:
"""Default transformer to extract output text from AgentCore response.
Expects responses with {"response": "<text>"} format.
Args:
response_line: The raw response line from AgentCore (without "data: " prefix).
Returns:
The extracted output text, or None if no text found.
"""
response_json = json.loads(response_line)
return response_json.get("response")
class AWSAgentCoreProcessor(FrameProcessor):
"""Processor that runs an Amazon Bedrock AgentCore agent.
Input:
- LLMContextFrame: Supplies a context used to invoke the agent.
Output:
- LLMTextFrame: The agent's text response(s).
A single agent invocation may result in multiple text frames.
This processor transforms the input context to a payload for the AgentCore
agent, and transforms the agent's response(s) into output text frame(s). Both
mappings are configurable via transformers. Below is the default behavior.
Input transformer (context_to_payload_transformer):
- Grabs the latest user or system message (if it's the latest message)
- Extracts its text content
- Constructs a payload that looks like {"prompt": "<text>"}
Output transformer (response_to_output_transformer):
- Expects responses that look like {"response": "<text>"}
- Extracts the text for use in the LLMTextFrame(s)
"""
def __init__(
self,
agentArn: str,
aws_access_key: Optional[str] = None,
aws_secret_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
aws_region: Optional[str] = None,
context_to_payload_transformer: Optional[
Callable[[LLMContext | OpenAILLMContext], Optional[str]]
] = None,
response_to_output_transformer: Optional[Callable[[str], Optional[str]]] = None,
**kwargs,
):
"""Initialize the AWS AgentCore processor.
Args:
agentArn: The Amazon Web Services Resource Name (ARN) of the agent.
aws_access_key: AWS access key ID. If None, uses default credentials.
aws_secret_key: AWS secret access key. If None, uses default credentials.
aws_session_token: AWS session token for temporary credentials.
aws_region: AWS region.
context_to_payload_transformer: Optional callable to transform
LLMContext into AgentCore payload string. If None, uses
default_context_to_payload_transformer.
response_to_output_transformer: Optional callable to extract output text
from AgentCore response. If None, uses
default_response_to_output_transformer.
**kwargs: Additional arguments passed to parent FrameProcessor.
"""
super().__init__(**kwargs)
self._agentArn = agentArn
self._aws_session = aioboto3.Session()
# Store AWS session parameters for creating client in async context
self._aws_params = {
"aws_access_key_id": aws_access_key or os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": aws_secret_key or os.getenv("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": aws_session_token or os.getenv("AWS_SESSION_TOKEN"),
"region_name": aws_region or os.getenv("AWS_REGION", "us-east-1"),
}
# Set transformers with defaults
self._context_to_payload_transformer = (
context_to_payload_transformer or default_context_to_payload_transformer
)
self._response_to_output_transformer = (
response_to_output_transformer or default_response_to_output_transformer
)
# State for managing output response bookends
self._output_response_open = False
self._last_text_frame_time: Optional[float] = None
self._close_task: Optional[asyncio.Task] = None
self._output_response_timeout = 1.0 # seconds
async def _close_output_response_after_timeout(self):
"""Close the output response after timeout if no new text frames arrive."""
await asyncio.sleep(self._output_response_timeout)
if self._output_response_open:
self._output_response_open = False
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
async def _push_text_frame(self, text: str):
"""Push a text frame, managing output response bookends."""
# Cancel any pending close task
if self._close_task and not self._close_task.done():
await self.cancel_task(self._close_task)
# Open output response if needed
if not self._output_response_open:
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
self._output_response_open = True
# Push the text frame
await self.push_frame(LLMTextFrame(text, skip_tts=self._get_skip_tts()))
self._last_text_frame_time = asyncio.get_event_loop().time()
# Schedule closing the output response after timeout
self._close_task = self.create_task(self._close_output_response_after_timeout())
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle LLM message frames.
Args:
frame: The incoming frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
# Create payload to invoke AgentCore agent
payload = self._context_to_payload_transformer(frame.context)
if not payload:
return
async with self._aws_session.client("bedrock-agentcore", **self._aws_params) as client:
# Invoke the AgentCore agent
response = await client.invoke_agent_runtime(
agentRuntimeArn=self._agentArn, payload=payload.encode()
)
# Determine if this is a streamed multi-part response, which
# will affect our parsing
is_multi_part_response = "text/event-stream" in response.get("contentType", "")
# Handle each response part (there may be one, for single
# responses, or multiple, for streamed multi-part responses)
async for part in response.get("response", []):
part_string = part.decode("utf-8")
# In streamed multi-part responses, each part might have
# one or more lines, each of which starts with "data: ".
# Treat each line as a response.
if is_multi_part_response:
for line in part_string.split("\n"):
# Get response text from this line
if not line:
continue
if not line.startswith("data: "):
logger.warning(f"Expected line to start with 'data: ', got: {line}")
continue
line = line[6:] # omit "data: "
# Transform response line to output text
text = self._response_to_output_transformer(line)
if text:
await self._push_text_frame(text)
# In single-part responses, the whole part is one response
# and there's no "data: " prefix
else:
# Transform response part string to output text
text = self._response_to_output_transformer(part_string)
if text:
await self._push_text_frame(text)
# Final close if output response is still open after all parts processed
if self._output_response_open:
if self._close_task and not self._close_task.done():
await self.cancel_task(self._close_task)
self._output_response_open = False
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
else:
await self.push_frame(frame, direction)

View File

@@ -734,7 +734,7 @@ class AWSBedrockLLMService(LLMService):
aws_access_key: Optional[str] = None,
aws_secret_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
aws_region: str = "us-east-1",
aws_region: Optional[str] = None,
params: Optional[InputParams] = None,
client_config: Optional[Config] = None,
retry_timeout_secs: Optional[float] = 5.0,
@@ -981,7 +981,7 @@ class AWSBedrockLLMService(LLMService):
using_noop_tool = False
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.start_processing_metrics()
await self.start_ttfb_metrics()
@@ -1078,9 +1078,9 @@ class AWSBedrockLLMService(LLMService):
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"]["delta"]
if "text" in delta:
frame = LLMTextFrame(delta["text"])
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self.push_frame(
LLMTextFrame(delta["text"], skip_tts=self._get_skip_tts())
)
completion_tokens_estimate += self._estimate_tokens(delta["text"])
elif "toolUse" in delta and "input" in delta["toolUse"]:
# Handle partial JSON for tool use
@@ -1138,10 +1138,10 @@ class AWSBedrockLLMService(LLMService):
except (ReadTimeoutError, asyncio.TimeoutError):
await self._call_event_handler("on_completion_timeout")
except Exception as e:
logger.exception(f"{self} exception: {e}")
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
comp_tokens = (
completion_tokens
if not use_completion_tokens_estimate

View File

@@ -27,6 +27,7 @@ from pydantic import BaseModel, Field
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.aws_nova_sonic_adapter import AWSNovaSonicLLMAdapter, Role
from pipecat.frames.frames import (
AggregationType,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
@@ -452,7 +453,7 @@ class AWSNovaSonicLLMService(LLMService):
self._ready_to_send_context = True
await self._finish_connecting_if_context_available()
except Exception as e:
logger.error(f"{self} initialization error: {e}")
await self.push_error(error_msg=f"Initialization error: {e}", exception=e)
await self._disconnect()
async def _process_completed_function_calls(self, send_new_results: bool):
@@ -576,7 +577,7 @@ class AWSNovaSonicLLMService(LLMService):
logger.info("Finished disconnecting")
except Exception as e:
logger.error(f"{self} error disconnecting: {e}")
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
def _create_client(self) -> BedrockRuntimeClient:
config = Config(
@@ -884,7 +885,7 @@ class AWSNovaSonicLLMService(LLMService):
# Errors are kind of expected while disconnecting, so just
# ignore them and do nothing
return
logger.error(f"{self} error processing responses: {e}")
await self.push_error(error_msg=f"Error processing responses: {e}", exception=e)
if self._wants_connection:
await self.reset_conversation()
@@ -1015,7 +1016,7 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug("Assistant response started")
# Report the start of the assistant response.
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
# Report that equivalent of TTS (this is a speech-to-speech model) started
await self.push_frame(TTSStartedFrame())
@@ -1027,7 +1028,7 @@ class AWSNovaSonicLLMService(LLMService):
logger.debug(f"Assistant response text added: {text}")
# Report the text of the assistant response.
frame = TTSTextFrame(text)
frame = TTSTextFrame(text, aggregated_by=AggregationType.SENTENCE)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
@@ -1061,14 +1062,16 @@ class AWSNovaSonicLLMService(LLMService):
# We also need to re-push the LLMFullResponseStartFrame since the
# TTSTextFrame would be ignored otherwise (the interruption frame
# would have cleared the assistant aggregator state).
await self.push_frame(LLMFullResponseStartFrame())
frame = TTSTextFrame(self._assistant_text_buffer)
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
frame = TTSTextFrame(
self._assistant_text_buffer, aggregated_by=AggregationType.SENTENCE
)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
self._may_need_repush_assistant_text = False
# Report the end of the assistant response.
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
# Report that equivalent of TTS (this is a speech-to-speech model) stopped.
await self.push_frame(TTSStoppedFrame())

View File

@@ -0,0 +1,283 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""AWS SageMaker bidirectional streaming client.
This module provides a client for streaming bidirectional communication with
SageMaker endpoints using the HTTP/2 protocol. Supports sending audio, text,
and JSON data to SageMaker model endpoints and receiving streaming responses.
"""
import os
from typing import Optional
from loguru import logger
try:
from aws_sdk_sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client
from aws_sdk_sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver
from aws_sdk_sagemaker_runtime_http2.models import (
InvokeEndpointWithBidirectionalStreamInput,
RequestPayloadPart,
RequestStreamEventPayloadPart,
ResponseStreamEvent,
)
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme
from smithy_aws_core.identity import EnvironmentCredentialsResolver
from smithy_core.aio.eventstream import DuplexEventStream
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use SageMaker BiDi client, you need to `pip install pipecat-ai[sagemaker]`."
)
raise Exception(f"Missing module: {e}")
class SageMakerBidiClient:
"""Client for bidirectional streaming with AWS SageMaker endpoints.
Handles low-level HTTP/2 bidirectional streaming protocol for communicating
with SageMaker model endpoints. Provides methods for sending various data
types (audio, text, JSON) and receiving streaming responses.
This client uses AWS SigV4 authentication and supports credential resolution
from environment variables, AWS CLI configuration, and instance metadata.
Example::
client = SageMakerBidiClient(
endpoint_name="my-deepgram-endpoint",
region="us-east-2",
model_invocation_path="v1/listen",
model_query_string="model=nova-3&language=en"
)
await client.start_session()
await client.send_audio_chunk(audio_bytes)
response = await client.receive_response()
await client.close_session()
"""
def __init__(
self,
endpoint_name: str,
region: str,
model_invocation_path: str = "",
model_query_string: str = "",
):
"""Initialize the SageMaker BiDi client.
Args:
endpoint_name: Name of the SageMaker endpoint to connect to.
region: AWS region where the endpoint is deployed.
model_invocation_path: API path for the model invocation (e.g., "v1/listen").
model_query_string: Query string parameters for the model (e.g., "model=nova-3").
"""
self.endpoint_name = endpoint_name
self.region = region
self.model_invocation_path = model_invocation_path
self.model_query_string = model_query_string
self.bidi_endpoint = f"https://runtime.sagemaker.{region}.amazonaws.com:8443"
self._client: Optional[SageMakerRuntimeHTTP2Client] = None
self._stream: Optional[
DuplexEventStream[RequestStreamEventPayloadPart, ResponseStreamEvent, any]
] = None
self._output_stream = None
self._is_active = False
def _initialize_client(self):
"""Initialize the SageMaker Runtime HTTP2 client with AWS credentials.
Creates and configures the SageMaker Runtime HTTP2 client with SigV4
authentication. Attempts to resolve AWS credentials from environment
variables, AWS CLI configuration, or instance metadata.
"""
logger.debug(f"Initializing SageMaker BiDi client for region: {self.region}")
logger.debug(f"Using endpoint URI: {self.bidi_endpoint}")
# Check for AWS credentials
has_env_creds = bool(os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY"))
if not has_env_creds:
logger.warning(
"AWS credentials not found in environment variables. "
"Attempting to use EnvironmentCredentialsResolver which will check "
"AWS CLI configuration and instance metadata."
)
config = Config(
endpoint_uri=self.bidi_endpoint,
region=self.region,
aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
auth_scheme_resolver=HTTPAuthSchemeResolver(),
auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")},
)
self._client = SageMakerRuntimeHTTP2Client(config=config)
async def start_session(self):
"""Start a bidirectional streaming session with the SageMaker endpoint.
Initializes the client if needed, creates the bidirectional stream, and
establishes the connection to the SageMaker endpoint. Must be called
before sending or receiving data.
Returns:
The output stream for receiving responses.
Raises:
RuntimeError: If client initialization or connection fails.
"""
if not self._client:
self._initialize_client()
logger.debug(f"Starting BiDi session with endpoint: {self.endpoint_name}")
logger.debug(f"Model invocation path: {self.model_invocation_path}")
logger.debug(f"Model query string: {self.model_query_string}")
# Create the bidirectional stream
stream_input = InvokeEndpointWithBidirectionalStreamInput(
endpoint_name=self.endpoint_name,
model_invocation_path=self.model_invocation_path,
model_query_string=self.model_query_string,
)
try:
self._stream = await self._client.invoke_endpoint_with_bidirectional_stream(
stream_input
)
self._is_active = True
# Get output stream
output = await self._stream.await_output()
self._output_stream = output[1]
logger.debug("BiDi session started successfully")
return self._output_stream
except Exception as e:
logger.error(f"Failed to start BiDi session: {e}")
self._is_active = False
raise RuntimeError(f"Failed to start SageMaker BiDi session: {e}")
async def send_data(self, data_bytes: bytes, data_type: Optional[str] = None):
"""Send a chunk of data to the stream.
Generic method for sending any type of data to the SageMaker endpoint.
Use the convenience methods (send_audio_chunk, send_text, send_json)
for common data types.
Args:
data_bytes: Raw bytes to send.
data_type: Optional data type header. Common values are "BINARY" for
audio/binary data and "UTF8" for text/JSON data.
Raises:
RuntimeError: If session is not active or send fails.
"""
if not self._is_active or not self._stream:
raise RuntimeError("BiDi session not active")
try:
payload = RequestPayloadPart(bytes_=data_bytes, data_type=data_type)
event = RequestStreamEventPayloadPart(value=payload)
await self._stream.input_stream.send(event)
except Exception as e:
logger.error(f"Failed to send data: {e}")
raise
async def send_audio_chunk(self, audio_bytes: bytes):
"""Send a chunk of audio data to the stream.
Convenience method for sending audio data. Automatically sets the data
type to "BINARY".
Args:
audio_bytes: Raw audio bytes to send (e.g., PCM audio data).
Raises:
RuntimeError: If session is not active or send fails.
"""
await self.send_data(audio_bytes, data_type="BINARY")
async def send_text(self, text: str):
"""Send text data to the stream.
Convenience method for sending text data. Automatically encodes the text
as UTF-8 and sets the data type to "UTF8".
Args:
text: Text string to send.
Raises:
RuntimeError: If session is not active or send fails.
"""
await self.send_data(text.encode("utf-8"), data_type="UTF8")
async def send_json(self, data: dict):
"""Send JSON data to the stream.
Convenience method for sending JSON-encoded messages. Useful for control
messages like KeepAlive or CloseStream. Automatically serializes the
dictionary to JSON, encodes as UTF-8, and sets the data type to "UTF8".
Args:
data: Dictionary to send as JSON (e.g., {"type": "KeepAlive"}).
Raises:
RuntimeError: If session is not active or send fails.
"""
import json
await self.send_data(json.dumps(data).encode("utf-8"), data_type="UTF8")
async def receive_response(self) -> Optional[ResponseStreamEvent]:
"""Receive a response from the stream.
Blocks until a response is available from the SageMaker endpoint. Returns
None when the stream is closed.
Returns:
The response event containing payload data, or None if stream is closed.
Raises:
RuntimeError: If session is not active.
"""
if not self._is_active or not self._output_stream:
raise RuntimeError("BiDi session not active")
try:
result = await self._output_stream.receive()
return result
except Exception as e:
logger.error(f"Failed to receive response: {e}")
raise
async def close_session(self):
"""Close the bidirectional streaming session.
Gracefully closes the input stream and marks the session as inactive.
Safe to call multiple times.
"""
if not self._is_active:
return
logger.debug("Closing BiDi session...")
self._is_active = False
try:
if self._stream:
await self._stream.input_stream.close()
logger.debug("BiDi session closed successfully")
except Exception as e:
logger.warning(f"Error closing BiDi session: {e}")
@property
def is_active(self) -> bool:
"""Check if the session is currently active.
Returns:
True if session is active, False otherwise.
"""
return self._is_active

View File

@@ -140,8 +140,7 @@ class AWSTranscribeSTTService(STTService):
return
logger.warning("WebSocket connection not established after connect")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(1) # Wait before retrying
@@ -182,8 +181,7 @@ class AWSTranscribeSTTService(STTService):
try:
await self._connect()
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
return
# Format the audio data according to AWS event stream format
@@ -200,13 +198,11 @@ class AWSTranscribeSTTService(STTService):
await self._disconnect()
# Don't yield error here - we'll retry on next frame
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
await self._disconnect()
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
await self._disconnect()
async def _connect(self):
@@ -289,8 +285,7 @@ class AWSTranscribeSTTService(STTService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await self._disconnect()
raise
@@ -310,8 +305,7 @@ class AWSTranscribeSTTService(STTService):
await self._ws_client.send(json.dumps(end_stream))
await self._ws_client.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._ws_client = None
await self._call_event_handler("on_disconnected")
@@ -529,15 +523,15 @@ class AWSTranscribeSTTService(STTService):
)
elif headers.get(":message-type") == "exception":
error_msg = payload.get("Message", "Unknown error")
logger.error(f"{self} Exception from AWS: {error_msg}")
await self.push_frame(ErrorFrame(f"AWS Transcribe error: {error_msg}"))
await self.push_error(error_msg=f"AWS Transcribe error: {error_msg}")
else:
logger.debug(f"{self} Other message type received: {headers}")
logger.debug(f"{self} Payload: {payload}")
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"{self} WebSocket connection closed in receive loop: {e}")
await self.push_error(
error_msg=f"WebSocket connection closed in receive loop", exception=e
)
break
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
break

View File

@@ -209,15 +209,6 @@ class AWSPollyTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that AWS TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that AWS's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to AWS Polly language format.
@@ -321,7 +312,6 @@ class AWSPollyTTSService(TTSService):
yield TTSStoppedFrame()
except (BotoCoreError, ClientError) as error:
logger.exception(f"{self} error generating TTS: {error}")
error_message = f"AWS Polly TTS error: {str(error)}"
yield ErrorFrame(error=error_message)

View File

@@ -91,7 +91,6 @@ class AzureImageGenServiceREST(ImageGenService):
while status != "succeeded":
attempts_left -= 1
if attempts_left == 0:
logger.error(f"{self} error: image generation timed out")
yield ErrorFrame("Image generation timed out")
return
@@ -104,7 +103,6 @@ class AzureImageGenServiceREST(ImageGenService):
image_url = json_response["result"]["data"][0]["url"] if json_response else None
if not image_url:
logger.error(f"{self} error: image generation failed")
yield ErrorFrame("Image generation failed")
return

View File

@@ -61,5 +61,5 @@ class AzureRealtimeLLMService(OpenAIRealtimeLLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
await self.push_error(error_msg=f"initialization error: {e}", exception=e)
self._websocket = None

View File

@@ -121,8 +121,7 @@ class AzureSTTService(STTService):
self._audio_stream.write(audio)
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
async def start(self, frame: StartFrame):
"""Start the speech recognition service.
@@ -151,8 +150,9 @@ class AzureSTTService(STTService):
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
self._speech_recognizer.start_continuous_recognition_async()
except Exception as e:
logger.error(f"{self} exception during initialization: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(
error_msg=f"Uncaught exception during initialization: {e}", exception=e
)
async def stop(self, frame: EndFrame):
"""Stop the speech recognition service.

View File

@@ -151,15 +151,6 @@ class AzureBaseTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Azure TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Azure's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Azure language format.
@@ -336,7 +327,6 @@ class AzureTTSService(AzureBaseTTSService):
try:
if self._speech_synthesizer is None:
error_msg = "Speech synthesizer not initialized."
logger.error(error_msg)
yield ErrorFrame(error=error_msg)
return
@@ -364,15 +354,13 @@ class AzureTTSService(AzureBaseTTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
# Could add reconnection logic here if needed
return
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
class AzureHttpTTSService(AzureBaseTTSService):
@@ -449,5 +437,6 @@ class AzureHttpTTSService(AzureBaseTTSService):
cancellation_details = result.cancellation_details
logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}")
if cancellation_details.reason == CancellationReason.Error:
logger.error(f"{self} error: {cancellation_details.error_details}")
yield ErrorFrame(error=f"{self} error: {cancellation_details.error_details}")
yield ErrorFrame(
error=f"Unknown error occurred: {cancellation_details.error_details}"
)

View File

@@ -276,8 +276,7 @@ class CartesiaSTTService(WebsocketSTTService):
self._websocket = await websocket_connect(ws_url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
async def _disconnect_websocket(self):
try:
@@ -285,8 +284,7 @@ class CartesiaSTTService(WebsocketSTTService):
logger.debug("Disconnecting from Cartesia STT")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -319,8 +317,7 @@ class CartesiaSTTService(WebsocketSTTService):
elif data["type"] == "error":
error_msg = data.get("message", "Unknown error")
logger.error(f"Cartesia error: {error_msg}")
await self.push_error(ErrorFrame(error=error_msg))
await self.push_error(error_msg=error_msg)
@traced_stt
async def _handle_transcription(

View File

@@ -10,7 +10,8 @@ import base64
import json
import uuid
import warnings
from typing import AsyncGenerator, List, Literal, Optional, Union
from enum import Enum
from typing import AsyncGenerator, List, Literal, Optional
from loguru import logger
from pydantic import BaseModel, Field
@@ -125,6 +126,72 @@ def language_to_cartesia_language(language: Language) -> Optional[str]:
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
class CartesiaEmotion(str, Enum):
"""Predefined Emotions supported by Cartesia."""
# Primary emotions supported by Cartesia
NEUTRAL = "neutral"
ANGRY = "angry"
EXCITED = "excited"
CONTENT = "content"
SAD = "sad"
SCARED = "scared"
# Additional emotions supported by Cartesia
HAPPY = "happy"
ENTHUSIASTIC = "enthusiastic"
ELATED = "elated"
EUPHORIC = "euphoric"
TRIUMPHANT = "triumphant"
AMAZED = "amazed"
SURPRISED = "surprised"
FLIRTATIOUS = "flirtatious"
JOKING_COMEDIC = "joking/comedic"
CURIOUS = "curious"
PEACEFUL = "peaceful"
SERENE = "serene"
CALM = "calm"
GRATEFUL = "grateful"
AFFECTIONATE = "affectionate"
TRUST = "trust"
SYMPATHETIC = "sympathetic"
ANTICIPATION = "anticipation"
MYSTERIOUS = "mysterious"
MAD = "mad"
OUTRAGED = "outraged"
FRUSTRATED = "frustrated"
AGITATED = "agitated"
THREATENED = "threatened"
DISGUSTED = "disgusted"
CONTEMPT = "contempt"
ENVIOUS = "envious"
SARCASTIC = "sarcastic"
IRONIC = "ironic"
DEJECTED = "dejected"
MELANCHOLIC = "melancholic"
DISAPPOINTED = "disappointed"
HURT = "hurt"
GUILTY = "guilty"
BORED = "bored"
TIRED = "tired"
REJECTED = "rejected"
NOSTALGIC = "nostalgic"
WISTFUL = "wistful"
APOLOGETIC = "apologetic"
HESITANT = "hesitant"
INSECURE = "insecure"
CONFUSED = "confused"
RESIGNED = "resigned"
ANXIOUS = "anxious"
PANICKED = "panicked"
ALARMED = "alarmed"
PROUD = "proud"
CONFIDENT = "confident"
DISTANT = "distant"
SKEPTICAL = "skeptical"
CONTEMPLATIVE = "contemplative"
DETERMINED = "determined"
class CartesiaTTSService(AudioContextWordTTSService):
"""Cartesia TTS service with WebSocket streaming and word timestamps.
@@ -182,6 +249,10 @@ class CartesiaTTSService(AudioContextWordTTSService):
container: Audio container format.
params: Additional input parameters for voice customization.
text_aggregator: Custom text aggregator for processing input text.
.. deprecated:: 0.0.95
Use an LLMTextProcessor before the TTSService for custom text aggregation.
aggregate_sentences: Whether to aggregate sentences within the TTSService.
**kwargs: Additional arguments passed to the parent service.
"""
@@ -200,10 +271,18 @@ class CartesiaTTSService(AudioContextWordTTSService):
push_text_frames=False,
pause_frame_processing=True,
sample_rate=sample_rate,
text_aggregator=text_aggregator or SkipTagsAggregator([("<spell>", "</spell>")]),
text_aggregator=text_aggregator,
**kwargs,
)
if not text_aggregator:
# Always skip tags added for spelled-out text
# Note: This is primarily to support backwards compatibility.
# The preferred way of taking advantage of Cartesia SSML Tags is
# to use an LLMTextProcessor and/or a text_transformer to identify
# and insert these tags for the purpose of the TTS service alone.
self._text_aggregator = SkipTagsAggregator([("<spell>", "</spell>")])
params = params or CartesiaTTSService.InputParams()
self._api_key = api_key
@@ -257,6 +336,27 @@ class CartesiaTTSService(AudioContextWordTTSService):
"""
return language_to_cartesia_language(language)
# A set of Cartesia-specific helpers for text transformations
def SPELL(text: str) -> str:
"""Wrap text in Cartesia spell tag."""
return f"<spell>{text}</spell>"
def EMOTION_TAG(emotion: CartesiaEmotion) -> str:
"""Convenience method to create an emotion tag."""
return f'<emotion value="{emotion}" />'
def PAUSE_TAG(seconds: float) -> str:
"""Convenience method to create a pause tag."""
return f'<break time="{seconds}s" />'
def VOLUME_TAG(volume: float) -> str:
"""Convenience method to create a volume tag."""
return f'<volume ratio="{volume}" />'
def SPEED_TAG(speed: float) -> str:
"""Convenience method to create a speed tag."""
return f'<speed ratio="{speed}" />'
def _is_cjk_language(self, language: str) -> bool:
"""Check if the given language is CJK (Chinese, Japanese, Korean).
@@ -397,8 +497,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -410,8 +509,7 @@ class CartesiaTTSService(AudioContextWordTTSService):
logger.debug("Disconnecting from Cartesia")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._context_id = None
self._websocket = None
@@ -464,13 +562,12 @@ class CartesiaTTSService(AudioContextWordTTSService):
)
await self.append_to_audio_context(msg["context_id"], frame)
elif msg["type"] == "error":
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
await self.push_error(error_msg=f"Error: {msg}")
self._context_id = None
else:
logger.error(f"{self} error, unknown message type: {msg}")
await self.push_error(error_msg=f"Error, unknown message type: {msg}")
async def _receive_messages(self):
while True:
@@ -508,16 +605,14 @@ class CartesiaTTSService(AudioContextWordTTSService):
await self._get_websocket().send(msg)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
class CartesiaHttpTTSService(TTSService):
@@ -708,8 +803,7 @@ class CartesiaHttpTTSService(TTSService):
async with session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Cartesia API error: {error_text}")
await self.push_error(ErrorFrame(error=f"Cartesia API error: {error_text}"))
yield ErrorFrame(error=f"Cartesia API error: {error_text}")
raise Exception(f"Cartesia API returned status {response.status}: {error_text}")
audio_data = await response.read()
@@ -725,8 +819,7 @@ class CartesiaHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -6,7 +6,9 @@
"""Deepgram Flux speech-to-text service implementation."""
import asyncio
import json
import time
from enum import Enum
from typing import Any, AsyncGenerator, Dict, Optional
from urllib.parse import urlencode
@@ -94,6 +96,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
mip_opt_out: Optional. Opts out requests from the Deepgram Model Improvement Program
(default False).
tag: List of tags to label requests for identification during usage reporting.
min_confidence: Optional. Minimum confidence required confidence to create a TranscriptionFrame
"""
eager_eot_threshold: Optional[float] = None
@@ -102,6 +105,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
keyterm: list = []
mip_opt_out: Optional[bool] = None
tag: list = []
min_confidence: Optional[float] = None # New parameter
def __init__(
self,
@@ -146,7 +150,17 @@ class DeepgramFluxSTTService(WebsocketSTTService):
params=params
)
"""
super().__init__(sample_rate=sample_rate, **kwargs)
# Note: For DeepgramFluxSTTService, differently from other processes, we need to create
# the _receive_task inside _connect_websocket, because the websocket should only be
# considered connected and ready to send audio once we receive from Flux the message
# which confirms the connection has been established.
# If we try to keep the logic reconnect_on_error, when receiving a message, the
# _receive_task_handler would try to reconnect in case of error, invoking the
# _connect_websocket again and leading to a case where the first _receive_task_handler
# was never destroyed.
# So we can keep it here as false, because inside the method send_with_retry, it will
# already try to reconnect if needed.
super().__init__(sample_rate=sample_rate, reconnect_on_error=False, **kwargs)
self._api_key = api_key
self._url = url
@@ -163,6 +177,13 @@ class DeepgramFluxSTTService(WebsocketSTTService):
self._register_event_handler("on_end_of_turn")
self._register_event_handler("on_eager_end_of_turn")
self._register_event_handler("on_update")
self._connection_established_event = asyncio.Event()
# Watchdog task to prevent dangling tasks
# If we stop sending audio to Flux after we have received that the User has started speaking
# we never receive the user stopped speaking event unless we resume sending audio to it.
self._last_stt_time = None
self._watchdog_task = None
self._user_is_speaking = False
async def _connect(self):
"""Connect to WebSocket and start background tasks.
@@ -172,9 +193,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
"""
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
"""Disconnect from WebSocket and clean up tasks.
@@ -182,21 +200,32 @@ class DeepgramFluxSTTService(WebsocketSTTService):
and cleans up resources to prevent memory leaks.
"""
try:
# Cancel background tasks BEFORE closing websocket
if self._receive_task:
await self.cancel_task(self._receive_task, timeout=2.0)
self._receive_task = None
# Now close the websocket
await self._disconnect_websocket()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
# Reset state only after everything is cleaned up
self._websocket = None
async def _send_silence(self, duration_secs: float = 0.5):
"""Send a block of silence of the specified duration (default 500 ms)."""
sample_width = 2 # bytes per sample for 16-bit PCM
num_channels = 1 # mono
num_samples = int(self.sample_rate * duration_secs)
silence = b"\x00" * (num_samples * sample_width * num_channels)
await self._websocket.send(silence)
async def _watchdog_task_handler(self):
while self._websocket and self._websocket.state is State.OPEN:
now = time.monotonic()
# More than 500 ms without sending new audio to Flux
if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5:
logger.warning("Sending silence to Flux to prevent dangling task")
await self._send_silence()
self._last_stt_time = time.monotonic()
# check every 100ms
await asyncio.sleep(0.1)
async def _connect_websocket(self):
"""Establish WebSocket connection to API.
@@ -208,15 +237,30 @@ class DeepgramFluxSTTService(WebsocketSTTService):
if self._websocket and self._websocket.state is State.OPEN:
return
self._connection_established_event.clear()
self._user_is_speaking = False
self._websocket = await websocket_connect(
self._websocket_url,
additional_headers={"Authorization": f"Token {self._api_key}"},
)
# Creating the receiver task
if not self._receive_task:
self._receive_task = self.create_task(
self._receive_task_handler(self._report_error)
)
# Creating the watchdog task
if not self._watchdog_task:
self._watchdog_task = self.create_task(self._watchdog_task_handler())
# Now wait for the connection established event
logger.debug("WebSocket connected, waiting for server confirmation...")
await self._connection_established_event.wait()
logger.debug("Connected to Deepgram Flux Websocket")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -227,6 +271,16 @@ class DeepgramFluxSTTService(WebsocketSTTService):
metrics collection. Handles disconnection errors gracefully.
"""
try:
# Cancel background tasks BEFORE closing websocket
if self._receive_task:
await self.cancel_task(self._receive_task, timeout=2.0)
self._receive_task = None
if self._watchdog_task:
await self.cancel_task(self._watchdog_task, timeout=2.0)
self._watchdog_task = None
self._last_stt_time = None
self._connection_established_event.clear()
await self.stop_all_metrics()
if self._websocket:
@@ -234,8 +288,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
logger.debug("Disconnecting from Deepgram Flux Websocket")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -245,10 +298,13 @@ class DeepgramFluxSTTService(WebsocketSTTService):
This signals to the server that no more audio data will be sent.
"""
if self._websocket:
logger.debug("Sending CloseStream message to Deepgram Flux")
message = {"type": "CloseStream"}
await self._websocket.send(json.dumps(message))
try:
if self._websocket:
logger.debug("Sending CloseStream message to Deepgram Flux")
message = {"type": "CloseStream"}
await self._websocket.send(json.dumps(message))
except Exception as e:
await self.push_error(error_msg=f"Error sending closeStream: {e}", exception=e)
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -335,15 +391,13 @@ class DeepgramFluxSTTService(WebsocketSTTService):
are issues sending the audio data.
"""
if not self._websocket:
logger.error("Not connected to Deepgram Flux.")
yield ErrorFrame("Not connected to Deepgram Flux.")
return
try:
await self._websocket.send(audio)
self._last_stt_time = time.monotonic()
await self.send_with_retry(audio, self._report_error)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
return
yield None
@@ -420,8 +474,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
# Skip malformed messages
continue
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
# Error will be handled inside WebsocketService->_receive_task_handler
raise
else:
@@ -463,6 +516,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
transcription processing.
"""
logger.info("Connected to Flux - ready to stream audio")
# Notify connection is established
self._connection_established_event.set()
async def _handle_fatal_error(self, data: Dict[str, Any]):
"""Handle fatal error messages from Deepgram Flux.
@@ -530,6 +585,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
transcript: maybe the first few words of the turn.
"""
logger.debug("User started speaking")
self._user_is_speaking = True
await self.push_interruption_task_frame_and_wait()
await self.broadcast_frame(UserStartedSpeakingFrame)
await self.start_metrics()
@@ -550,6 +606,22 @@ class DeepgramFluxSTTService(WebsocketSTTService):
logger.trace(f"Received event TurnResumed: {event}")
await self._call_event_handler("on_turn_resumed")
def _calculate_average_confidence(self, transcript_data) -> Optional[float]:
"""Calculate the average confidence from transcript data.
Return None if the data is missing or invalid.
"""
# Example: Assume transcript_data has a list of words with confidence
words = transcript_data.get("words")
if not words or not isinstance(words, list):
return None
confidences = [
w.get("confidence") for w in words if isinstance(w.get("confidence"), (float, int))
]
if not confidences:
return None
return sum(confidences) / len(confidences)
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EndOfTurn events from Deepgram Flux.
@@ -569,16 +641,26 @@ class DeepgramFluxSTTService(WebsocketSTTService):
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
logger.debug("User stopped speaking")
self._user_is_speaking = False
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
self._language,
result=data,
# Compute the average confidence
average_confidence = self._calculate_average_confidence(data)
if not self._params.min_confidence or average_confidence > self._params.min_confidence:
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
self._language,
result=data,
)
)
)
else:
logger.warning(
f"Transcription confidence below min_confidence threshold: {average_confidence}"
)
await self._handle_transcription(transcript, True, self._language)
await self.stop_processing_metrics()
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)

View File

@@ -233,7 +233,7 @@ class DeepgramSTTService(STTService):
)
if not await self._connection.start(options=self._settings, addons=self._addons):
logger.error(f"{self}: unable to connect to Deepgram")
await self.push_error(error_msg=f"Unable to connect to Deepgram")
async def _disconnect(self):
if await self._connection.is_connected():
@@ -256,7 +256,7 @@ class DeepgramSTTService(STTService):
async def _on_error(self, *args, **kwargs):
error: ErrorResponse = kwargs["error"]
logger.warning(f"{self} connection error, will retry: {error}")
await self.push_error(ErrorFrame(error=f"{error}"))
await self.push_error(error_msg=f"{error}")
await self.stop_all_metrics()
# NOTE(aleix): we don't disconnect (i.e. call finish on the connection)
# because this triggers more errors internally in the Deepgram SDK. So,

View File

@@ -0,0 +1,444 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Deepgram speech-to-text service for AWS SageMaker.
This module provides a Pipecat STT service that connects to Deepgram models
deployed on AWS SageMaker endpoints. Uses HTTP/2 bidirectional streaming for
low-latency real-time transcription with support for interim results, multiple
languages, and various Deepgram features.
"""
import asyncio
import json
from typing import AsyncGenerator, Optional
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.sagemaker.bidi_client import SageMakerBidiClient
from pipecat.services.stt_service import STTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from deepgram import LiveOptions
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use DeepgramSageMakerSTTService, you need to `pip install pipecat-ai[deepgram,sagemaker]`."
)
raise Exception(f"Missing module: {e}")
class DeepgramSageMakerSTTService(STTService):
"""Deepgram speech-to-text service for AWS SageMaker.
Provides real-time speech recognition using Deepgram models deployed on
AWS SageMaker endpoints. Uses HTTP/2 bidirectional streaming for low-latency
transcription with support for interim results, speaker diarization, and
multiple languages.
Requirements:
- AWS credentials configured (via environment variables, AWS CLI, or instance metadata)
- A deployed SageMaker endpoint with Deepgram model: https://developers.deepgram.com/docs/deploy-amazon-sagemaker
- Deepgram SDK for LiveOptions configuration
Example::
stt = DeepgramSageMakerSTTService(
endpoint_name="my-deepgram-endpoint",
region="us-east-2",
live_options=LiveOptions(
model="nova-3",
language="en",
interim_results=True,
punctuate=True,
),
)
"""
def __init__(
self,
*,
endpoint_name: str,
region: str,
sample_rate: Optional[int] = None,
live_options: Optional[LiveOptions] = None,
**kwargs,
):
"""Initialize the Deepgram SageMaker STT service.
Args:
endpoint_name: Name of the SageMaker endpoint with Deepgram model
deployed (e.g., "my-deepgram-nova-3-endpoint").
region: AWS region where the endpoint is deployed (e.g., "us-east-2").
sample_rate: Audio sample rate in Hz. If None, uses value from
live_options or defaults to the value from StartFrame.
live_options: Deepgram LiveOptions for detailed configuration. If None,
uses sensible defaults (nova-3 model, English, interim results enabled).
**kwargs: Additional arguments passed to the parent STTService.
"""
sample_rate = sample_rate or (live_options.sample_rate if live_options else None)
super().__init__(sample_rate=sample_rate, **kwargs)
self._endpoint_name = endpoint_name
self._region = region
# Create default options similar to DeepgramSTTService
default_options = LiveOptions(
encoding="linear16",
language=Language.EN,
model="nova-3",
channels=1,
interim_results=True,
punctuate=True,
)
# Merge with provided options
merged_options = default_options.to_dict()
if live_options:
default_model = default_options.model
merged_options.update(live_options.to_dict())
# Handle the "None" string bug from deepgram-sdk
if "model" in merged_options and merged_options["model"] == "None":
merged_options["model"] = default_model
# Convert Language enum to string if needed
if "language" in merged_options and isinstance(merged_options["language"], Language):
merged_options["language"] = merged_options["language"].value
self.set_model_name(merged_options["model"])
self._settings = merged_options
self._client: Optional[SageMakerBidiClient] = None
self._response_task: Optional[asyncio.Task] = None
self._keepalive_task: Optional[asyncio.Task] = None
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Deepgram SageMaker service supports metrics generation.
"""
return True
async def set_model(self, model: str):
"""Set the Deepgram model and reconnect.
Disconnects from the current session, updates the model setting, and
establishes a new connection with the updated model.
Args:
model: The Deepgram model name to use (e.g., "nova-3").
"""
await super().set_model(model)
logger.info(f"Switching STT model to: [{model}]")
self._settings["model"] = model
await self._disconnect()
await self._connect()
async def set_language(self, language: Language):
"""Set the recognition language and reconnect.
Disconnects from the current session, updates the language setting, and
establishes a new connection with the updated language.
Args:
language: The language to use for speech recognition (e.g., Language.EN,
Language.ES).
"""
logger.info(f"Switching STT language to: [{language}]")
self._settings["language"] = language
await self._disconnect()
await self._connect()
async def start(self, frame: StartFrame):
"""Start the Deepgram SageMaker STT service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._settings["sample_rate"] = self.sample_rate
await self._connect()
async def stop(self, frame: EndFrame):
"""Stop the Deepgram SageMaker STT service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the Deepgram SageMaker STT service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Send audio data to Deepgram for transcription.
Args:
audio: Raw audio bytes to transcribe.
Yields:
Frame: None (transcription results come via BiDi stream callbacks).
"""
if self._client and self._client.is_active:
try:
await self._client.send_audio_chunk(audio)
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield None
async def _connect(self):
"""Connect to the SageMaker endpoint and start the BiDi session.
Builds the Deepgram query string from settings, creates the BiDi client,
starts the streaming session, and launches background tasks for processing
responses and sending KeepAlive messages.
"""
logger.debug("Connecting to Deepgram on SageMaker...")
# Update sample rate in settings
self._settings["sample_rate"] = self.sample_rate
# Build query string from settings, converting booleans to strings
query_params = {}
for key, value in self._settings.items():
if value is not None:
# Convert boolean values to lowercase strings for Deepgram API
if isinstance(value, bool):
query_params[key] = str(value).lower()
else:
query_params[key] = str(value)
query_string = "&".join(f"{k}={v}" for k, v in query_params.items())
# Create BiDi client
self._client = SageMakerBidiClient(
endpoint_name=self._endpoint_name,
region=self._region,
model_invocation_path="v1/listen",
model_query_string=query_string,
)
try:
# Start the session
await self._client.start_session()
# Start processing responses in the background
self._response_task = self.create_task(self._process_responses())
# Start keepalive task to maintain connection
self._keepalive_task = self.create_task(self._send_keepalive())
logger.debug("Connected to Deepgram on SageMaker")
await self._call_event_handler("on_connected")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await self._call_event_handler("on_connection_error", str(e))
async def _disconnect(self):
"""Disconnect from the SageMaker endpoint.
Sends a CloseStream message to Deepgram, cancels background tasks
(KeepAlive and response processing), and closes the BiDi session.
Safe to call multiple times.
"""
if self._client and self._client.is_active:
logger.debug("Disconnecting from Deepgram on SageMaker...")
# Send CloseStream message to Deepgram
try:
await self._client.send_json({"type": "CloseStream"})
except Exception as e:
logger.warning(f"Failed to send CloseStream message: {e}")
# Cancel keepalive task
if self._keepalive_task and not self._keepalive_task.done():
await self.cancel_task(self._keepalive_task)
# Cancel response processing task
if self._response_task and not self._response_task.done():
await self.cancel_task(self._response_task)
# Close the BiDi session
await self._client.close_session()
logger.debug("Disconnected from Deepgram on SageMaker")
await self._call_event_handler("on_disconnected")
async def _send_keepalive(self):
"""Send periodic KeepAlive messages to maintain the connection.
Sends a KeepAlive JSON message to Deepgram every 5 seconds while the
connection is active. This prevents the connection from timing out during
periods of silence.
"""
while self._client and self._client.is_active:
await asyncio.sleep(5)
if self._client and self._client.is_active:
try:
await self._client.send_json({"type": "KeepAlive"})
except Exception as e:
logger.warning(f"Failed to send KeepAlive: {e}")
async def _process_responses(self):
"""Process streaming responses from Deepgram on SageMaker.
Continuously receives responses from the BiDi stream, decodes the payload,
parses JSON responses from Deepgram, and processes transcription results.
Runs as a background task until the connection is closed or cancelled.
"""
try:
while self._client and self._client.is_active:
result = await self._client.receive_response()
if result is None:
break
# Check if this is a PayloadPart with bytes
if hasattr(result, "value") and hasattr(result.value, "bytes_"):
if result.value.bytes_:
response_data = result.value.bytes_.decode("utf-8")
try:
# Parse JSON response from Deepgram
parsed = json.loads(response_data)
# Extract and process transcript if available
if "channel" in parsed:
await self._handle_transcript_response(parsed)
except json.JSONDecodeError:
logger.warning(f"Non-JSON response: {response_data}")
except asyncio.CancelledError:
logger.debug("Response processor cancelled")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
logger.debug("Response processor stopped")
async def _handle_transcript_response(self, parsed: dict):
"""Handle a transcript response from Deepgram.
Extracts the transcript text, determines if it's final or interim, extracts
language information, and pushes the appropriate frame (TranscriptionFrame
or InterimTranscriptionFrame) downstream.
Args:
parsed: The parsed JSON response from Deepgram containing channel,
alternatives, transcript, and metadata.
"""
alternatives = parsed.get("channel", {}).get("alternatives", [])
if not alternatives or not alternatives[0].get("transcript"):
return
transcript = alternatives[0]["transcript"]
if not transcript.strip():
return
# Stop TTFB metrics on first transcript
await self.stop_ttfb_metrics()
is_final = parsed.get("is_final", False)
speech_final = parsed.get("speech_final", False)
# Extract language if available
language = None
if alternatives[0].get("languages"):
language = alternatives[0]["languages"][0]
language = Language(language)
if is_final and speech_final:
# Final transcription
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=parsed,
)
)
await self._handle_transcription(transcript, is_final, language)
await self.stop_processing_metrics()
else:
# Interim transcription
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=parsed,
)
)
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing.
This method is decorated with @traced_stt for observability and tracing
integration. The actual transcription processing is handled by the parent
class and observers.
Args:
transcript: The transcribed text.
is_final: Whether this is a final transcription result.
language: The detected language of the transcription, if available.
"""
pass
async def start_metrics(self):
"""Start TTFB and processing metrics collection."""
await self.start_ttfb_metrics()
await self.start_processing_metrics()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with Deepgram SageMaker-specific handling.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
# Start metrics when user starts speaking (if VAD is not provided by Deepgram)
if isinstance(frame, UserStartedSpeakingFrame):
await self.start_metrics()
elif isinstance(frame, UserStoppedSpeakingFrame):
# Send finalize message to Deepgram when user stops speaking
# This tells Deepgram to flush any remaining audio and return final results
if self._client and self._client.is_active:
try:
await self._client.send_json({"type": "Finalize"})
except Exception as e:
logger.warning(f"Error sending Finalize message: {e}")

View File

@@ -10,35 +10,45 @@ This module provides integration with Deepgram's text-to-speech API
for generating speech from text using various voice models.
"""
import json
from typing import AsyncGenerator, Optional
import aiohttp
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
LLMFullResponseEndFrame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.tts_service import TTSService
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import TTSService, WebsocketTTSService
from pipecat.utils.tracing.service_decorators import traced_tts
try:
from deepgram import DeepgramClient, DeepgramClientOptions, SpeakOptions
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Deepgram, you need to `pip install pipecat-ai[deepgram]`.")
logger.error(
"In order to use DeepgramWebsocketTTSService, you need to `pip install pipecat-ai[deepgram]`."
)
raise Exception(f"Missing module: {e}")
class DeepgramTTSService(TTSService):
"""Deepgram text-to-speech service.
class DeepgramTTSService(WebsocketTTSService):
"""Deepgram WebSocket-based text-to-speech service.
Provides text-to-speech synthesis using Deepgram's streaming API.
Supports various voice models and audio encoding formats with
configurable sample rates and quality settings.
Provides real-time text-to-speech synthesis using Deepgram's WebSocket API.
Supports streaming audio generation with interruption handling via the Clear
message for conversational AI use cases.
"""
def __init__(
@@ -46,51 +56,211 @@ class DeepgramTTSService(TTSService):
*,
api_key: str,
voice: str = "aura-2-helena-en",
base_url: str = "",
base_url: str = "wss://api.deepgram.com",
sample_rate: Optional[int] = None,
encoding: str = "linear16",
**kwargs,
):
"""Initialize the Deepgram TTS service.
"""Initialize the Deepgram WebSocket TTS service.
Args:
api_key: Deepgram API key for authentication.
voice: Voice model to use for synthesis. Defaults to "aura-2-helena-en".
base_url: Custom base URL for Deepgram API. Uses default if empty.
base_url: WebSocket base URL for Deepgram API. Defaults to "wss://api.deepgram.com".
sample_rate: Audio sample rate in Hz. If None, uses service default.
encoding: Audio encoding format. Defaults to "linear16".
**kwargs: Additional arguments passed to parent TTSService class.
**kwargs: Additional arguments passed to parent InterruptibleTTSService class.
"""
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._base_url = base_url
self._settings = {
"encoding": encoding,
}
self.set_voice(voice)
client_options = DeepgramClientOptions(url=base_url)
self._deepgram_client = DeepgramClient(api_key, config=client_options)
self._receive_task = None
def can_generate_metrics(self) -> bool:
"""Check if the service can generate metrics.
Returns:
True, as Deepgram TTS service supports metrics generation.
True, as Deepgram WebSocket TTS service supports metrics generation.
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Deepgram TTSTextFrames include necessary inter-frame spaces.
async def start(self, frame: StartFrame):
"""Start the Deepgram WebSocket TTS service.
Returns:
True, indicating that Deepgram's text frames include necessary inter-frame spaces.
Args:
frame: The start frame containing initialization parameters.
"""
return True
await super().start(frame)
await self._connect()
async def stop(self, frame: EndFrame):
"""Stop the Deepgram WebSocket TTS service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the Deepgram WebSocket TTS service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with special handling for LLM response end.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
# When the LLM finishes responding, flush any remaining text in Deepgram's buffer
if isinstance(frame, (LLMFullResponseEndFrame, EndFrame)):
await self.flush_audio()
async def _connect(self):
"""Connect to Deepgram WebSocket and start receive task."""
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
"""Disconnect from Deepgram WebSocket and clean up tasks."""
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
"""Connect to Deepgram WebSocket API with configured settings."""
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to Deepgram WebSocket")
# Build WebSocket URL with query parameters
params = []
params.append(f"model={self._voice_id}")
params.append(f"encoding={self._settings['encoding']}")
params.append(f"sample_rate={self.sample_rate}")
url = f"{self._base_url}/v1/speak?{'&'.join(params)}"
headers = {"Authorization": f"Token {self._api_key}"}
self._websocket = await websocket_connect(url, additional_headers=headers)
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
"""Close WebSocket connection and reset state."""
try:
await self.stop_all_metrics()
if self._websocket:
logger.debug("Disconnecting from Deepgram WebSocket")
# Send Close message to gracefully close the connection
await self._websocket.send(json.dumps({"type": "Close"}))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
"""Get active websocket connection or raise exception."""
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameDirection):
"""Handle interruption by sending Clear message to Deepgram.
The Clear message will clear Deepgram's internal text buffer and stop
sending audio, allowing for a new response to be generated.
"""
await super()._handle_interruption(frame, direction)
# Send Clear message to stop current audio generation
if self._websocket:
try:
clear_msg = {"type": "Clear"}
await self._websocket.send(json.dumps(clear_msg))
except Exception as e:
logger.error(f"{self} error sending Clear message: {e}")
async def _receive_messages(self):
"""Receive and process messages from Deepgram WebSocket."""
async for message in self._get_websocket():
if isinstance(message, bytes):
# Binary message contains audio data
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(message, self.sample_rate, 1)
await self.push_frame(frame)
elif isinstance(message, str):
# Text message contains metadata or control messages
try:
msg = json.loads(message)
msg_type = msg.get("type")
if msg_type == "Metadata":
logger.trace(f"Received metadata: {msg}")
elif msg_type == "Flushed":
logger.trace(f"Received Flushed: {msg}")
# Flushed indicates the end of audio generation for the current buffer
# This happens after flush_audio() is called
await self.push_frame(TTSStoppedFrame())
elif msg_type == "Cleared":
logger.trace(f"Received Cleared: {msg}")
# Buffer has been cleared after interruption
# TTSStoppedFrame will be sent by the interruption handler
elif msg_type == "Warning":
logger.warning(
f"{self} warning: {msg.get('description', 'Unknown warning')}"
)
else:
logger.debug(f"Received unknown message type: {msg}")
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
async def flush_audio(self):
"""Flush any pending audio synthesis by sending Flush command.
This should be called when the LLM finishes a complete response to force
generation of audio from Deepgram's internal text buffer.
"""
if self._websocket:
try:
flush_msg = {"type": "Flush"}
await self._websocket.send(json.dumps(flush_msg))
except Exception as e:
logger.error(f"{self} error sending Flush message: {e}")
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Deepgram's TTS API.
"""Generate speech from text using Deepgram's WebSocket TTS API.
Args:
text: The text to synthesize into speech.
@@ -100,33 +270,27 @@ class DeepgramTTSService(TTSService):
"""
logger.debug(f"{self}: Generating TTS [{text}]")
options = SpeakOptions(
model=self._voice_id,
encoding=self._settings["encoding"],
sample_rate=self.sample_rate,
container="none",
)
try:
# Reconnect if the websocket is closed
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
await self.start_ttfb_metrics()
response = await self._deepgram_client.speak.asyncrest.v("1").stream_raw(
{"text": text}, options
)
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
async for data in response.aiter_bytes():
await self.stop_ttfb_metrics()
if data:
yield TTSAudioRawFrame(audio=data, sample_rate=self.sample_rate, num_channels=1)
# Send text message to Deepgram
# Note: We don't send Flush here - that should only be sent when the
# LLM finishes a complete response via flush_audio()
speak_msg = {"type": "Speak", "text": text}
await self._get_websocket().send(json.dumps(speak_msg))
yield TTSStoppedFrame()
# The actual audio frames will be handled in _receive_messages
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
class DeepgramHttpTTSService(TTSService):
@@ -177,15 +341,6 @@ class DeepgramHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Deepgram TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Deepgram's text frames include necessary inter-frame spaces.
"""
return True
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Deepgram's TTS API.
@@ -245,5 +400,4 @@ class DeepgramHttpTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.exception(f"{self} exception: {e}")
yield ErrorFrame(f"Error getting audio: {str(e)}")

View File

@@ -351,8 +351,7 @@ class ElevenLabsSTTService(SegmentedSTTService):
)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
def audio_format_from_sample_rate(sample_rate: int) -> str:
@@ -416,6 +415,8 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
Only used when commit_strategy is VAD. None uses ElevenLabs default.
min_silence_duration_ms: Minimum silence duration for VAD (50-2000ms).
Only used when commit_strategy is VAD. None uses ElevenLabs default.
include_timestamps: Whether to include word-level timestamps in transcripts.
enable_logging: Whether to enable logging on ElevenLabs' side.
"""
language_code: Optional[str] = None
@@ -424,6 +425,8 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
vad_threshold: Optional[float] = None
min_speech_duration_ms: Optional[int] = None
min_silence_duration_ms: Optional[int] = None
include_timestamps: bool = False
enable_logging: bool = False
def __init__(
self,
@@ -459,6 +462,8 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
self._audio_format = "" # initialized in start()
self._receive_task = None
self._settings = {"language": params.language_code}
def can_generate_metrics(self) -> bool:
"""Check if the service can generate processing metrics.
@@ -477,7 +482,13 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
Changing language requires reconnecting to the WebSocket.
"""
logger.info(f"Switching STT language to: [{language}]")
self._params.language_code = language.value if isinstance(language, Language) else language
new_language = (
language_to_elevenlabs_language(language)
if isinstance(language, Language)
else language
)
self._params.language_code = new_language
self._settings["language"] = new_language
# Reconnect with new settings
await self._disconnect()
await self._connect()
@@ -586,7 +597,6 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
}
await self._websocket.send(json.dumps(message))
except Exception as e:
logger.error(f"Error sending audio: {e}")
yield ErrorFrame(f"ElevenLabs Realtime STT error: {str(e)}")
yield None
@@ -620,10 +630,16 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
if self._params.language_code:
params.append(f"language_code={self._params.language_code}")
params.append(f"encoding={self._audio_format}")
params.append(f"sample_rate={self.sample_rate}")
params.append(f"audio_format={self._audio_format}")
params.append(f"commit_strategy={self._params.commit_strategy.value}")
# Add optional parameters
if self._params.include_timestamps:
params.append(f"include_timestamps={str(self._params.include_timestamps).lower()}")
if self._params.enable_logging:
params.append(f"enable_logging={str(self._params.enable_logging).lower()}")
# Add VAD parameters if using VAD commit strategy and values are specified
if self._params.commit_strategy == CommitStrategy.VAD:
if self._params.vad_silence_threshold_secs is not None:
@@ -645,8 +661,9 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
await self._call_event_handler("on_connected")
logger.debug("Connected to ElevenLabs Realtime STT")
except Exception as e:
logger.error(f"{self}: unable to connect to ElevenLabs Realtime STT: {e}")
await self.push_error(ErrorFrame(f"Connection error: {str(e)}"))
await self.push_error(
error_msg=f"Unable to connect to ElevenLabs Realtime STT: {e}", exception=e
)
async def _disconnect_websocket(self):
"""Disconnect from ElevenLabs Realtime STT WebSocket."""
@@ -655,7 +672,7 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
logger.debug("Disconnecting from ElevenLabs Realtime STT")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
finally:
self._websocket = None
await self._call_event_handler("on_disconnected")
@@ -712,15 +729,20 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
elif message_type == "committed_transcript_with_timestamps":
await self._on_committed_transcript_with_timestamps(data)
elif message_type == "input_error":
error_msg = data.get("error", "Unknown input error")
logger.error(f"ElevenLabs input error: {error_msg}")
await self.push_error(ErrorFrame(f"Input error: {error_msg}"))
elif message_type == "error":
error_msg = data.get("error", "Unknown error")
logger.error(f"ElevenLabs error: {error_msg}")
await self.push_error(error_msg=f"Error: {error_msg}")
elif message_type in ["auth_error", "quota_exceeded", "transcriber_error", "error"]:
error_msg = data.get("error", data.get("message", "Unknown error"))
logger.error(f"ElevenLabs error ({message_type}): {error_msg}")
await self.push_error(ErrorFrame(f"{message_type}: {error_msg}"))
elif message_type == "auth_error":
error_msg = data.get("error", "Authentication error")
logger.error(f"ElevenLabs auth error: {error_msg}")
await self.push_error(error_msg=f"Auth error: {error_msg}")
elif message_type == "quota_exceeded_error":
error_msg = data.get("error", "Quota exceeded")
logger.error(f"ElevenLabs quota exceeded: {error_msg}")
await self.push_error(error_msg=f"Quota exceeded: {error_msg}")
else:
logger.debug(f"Unknown message type: {message_type}")
@@ -765,6 +787,11 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
Args:
data: Committed transcript data.
"""
# If timestamps are enabled, skip this message and wait for the
# committed_transcript_with_timestamps message which contains all the data
if self._params.include_timestamps:
return
text = data.get("text", "").strip()
if not text:
return
@@ -792,6 +819,18 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
async def _on_committed_transcript_with_timestamps(self, data: dict):
"""Handle committed transcript with word-level timestamps.
This message is sent when include_timestamps=true. The result data includes:
- text: The transcribed text
- language_code: Detected language (if available)
- words: Array of word objects with timing information:
- text: The word text
- start: Start time in seconds
- end: End time in seconds
- type: "word" or "spacing"
- speaker_id: Speaker identifier (if available)
- logprob: Log probability score (if available)
- characters: Array of character strings (if available)
Args:
data: Committed transcript data with timestamps.
"""
@@ -799,9 +838,24 @@ class ElevenLabsRealtimeSTTService(WebsocketSTTService):
if not text:
return
logger.debug(f"Committed transcript with timestamps: [{text}]")
logger.trace(f"Timestamps: {data.get('words', [])}")
await self.stop_ttfb_metrics()
await self.stop_processing_metrics()
# This is sent after the committed_transcript, so we don't need to
# push another TranscriptionFrame, but we could use the timestamps
# for additional processing if needed in the future
# Get language if provided
language = data.get("language_code")
logger.debug(f"Committed transcript with timestamps: [{text}]")
await self._handle_transcription(text, True, language)
# This message is sent after committed_transcript when include_timestamps=true.
# It contains the full transcript data including text and word-level timestamps.
await self.push_frame(
TranscriptionFrame(
text,
self._user_id,
time_now_iso8601(),
language,
result=data,
)
)

View File

@@ -424,8 +424,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._context_id = None
self._started = False
@@ -536,9 +535,8 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
self._websocket = None
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
@@ -553,8 +551,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._websocket.close()
logger.debug("Disconnected from ElevenLabs")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._started = False
self._context_id = None
@@ -584,8 +581,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
json.dumps({"context_id": self._context_id, "close_context": True})
)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._context_id = None
self._started = False
self._partial_word = ""
@@ -740,15 +736,13 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
else:
await self._send_text(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield TTSStoppedFrame()
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
self._started = False
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
class ElevenLabsHttpTTSService(WordTTSService):
@@ -1043,7 +1037,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"{self} error: {error_text}")
yield ErrorFrame(error=f"ElevenLabs API error: {error_text}")
return
@@ -1091,8 +1084,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
logger.warning(f"Failed to parse JSON from stream: {e}")
continue
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
continue
# After processing all chunks, emit any remaining partial word
@@ -1116,8 +1108,7 @@ class ElevenLabsHttpTTSService(WordTTSService):
self._previous_text = text
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()
# Let the parent class handle TTSStoppedFrame

View File

@@ -110,7 +110,6 @@ class FalImageGenService(ImageGenService):
image_url = response["images"][0]["url"] if response else None
if not image_url:
logger.error(f"{self} error: image generation failed")
yield ErrorFrame("Image generation failed")
return

View File

@@ -290,5 +290,4 @@ class FalSTTService(SegmentedSTTService):
)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")

View File

@@ -76,7 +76,7 @@ class FishAudioTTSService(InterruptibleTTSService):
api_key: str,
reference_id: Optional[str] = None, # This is the voice ID
model: Optional[str] = None, # Deprecated
model_id: str = "speech-1.5",
model_id: str = "s1",
output_format: FishAudioOutputFormat = "pcm",
sample_rate: Optional[int] = None,
params: Optional[InputParams] = None,
@@ -93,7 +93,7 @@ class FishAudioTTSService(InterruptibleTTSService):
The `model` parameter is deprecated and will be removed in version 0.1.0.
Use `reference_id` instead to specify the voice model.
model_id: Specify which Fish Audio TTS model to use (e.g. "speech-1.5")
model_id: Specify which Fish Audio TTS model to use (e.g. "s1")
output_format: Audio output format. Defaults to "pcm".
sample_rate: Audio sample rate. If None, uses default.
params: Additional input parameters for voice customization.
@@ -159,15 +159,6 @@ class FishAudioTTSService(InterruptibleTTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Fish Audio TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Fish Audio's text frames include necessary inter-frame spaces.
"""
return True
async def set_model(self, model: str):
"""Set the TTS model and reconnect.
@@ -237,8 +228,7 @@ class FishAudioTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -252,8 +242,7 @@ class FishAudioTTSService(InterruptibleTTSService):
await self._websocket.send(ormsgpack.packb(stop_message))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._request_id = None
self._started = False
@@ -295,8 +284,7 @@ class FishAudioTTSService(InterruptibleTTSService):
continue
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
@@ -332,8 +320,7 @@ class FishAudioTTSService(InterruptibleTTSService):
flush_message = {"event": "flush"}
await self._get_websocket().send(ormsgpack.packb(flush_message))
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -341,5 +328,4 @@ class FishAudioTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")

View File

@@ -468,8 +468,7 @@ class GladiaSTTService(STTService):
break
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._connection_active = False
if not self._should_reconnect:
@@ -559,8 +558,7 @@ class GladiaSTTService(STTService):
except websockets.exceptions.ConnectionClosed:
logger.debug("Connection closed during keepalive")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
async def _receive_task_handler(self):
try:
@@ -623,8 +621,7 @@ class GladiaSTTService(STTService):
# Expected when closing the connection
pass
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
async def _maybe_reconnect(self) -> bool:
"""Handle exponential backoff reconnection logic."""
@@ -632,7 +629,9 @@ class GladiaSTTService(STTService):
return False
self._reconnection_attempts += 1
if self._reconnection_attempts > self._max_reconnection_attempts:
logger.error(f"Max reconnection attempts ({self._max_reconnection_attempts}) reached")
await self.push_error(
error_msg=f"Max reconnection attempts ({self._max_reconnection_attempts}) reached",
)
self._should_reconnect = False
return False
delay = self._reconnection_delay * (2 ** (self._reconnection_attempts - 1))

View File

@@ -27,6 +27,7 @@ from pydantic import BaseModel, Field
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.gemini_adapter import GeminiLLMAdapter
from pipecat.frames.frames import (
AggregationType,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
CancelFrame,
@@ -1174,7 +1175,7 @@ class GeminiLiveLLMService(LLMService):
self._connection_task = self.create_task(self._connection_task_handler(config=config))
except Exception as e:
await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}"))
await self.push_error(error_msg=f"Initialization error: {e}", exception=e)
async def _connection_task_handler(self, config: LiveConnectConfig):
async with self._client.aio.live.connect(model=self._model_name, config=config) as session:
@@ -1251,11 +1252,11 @@ class GeminiLiveLLMService(LLMService):
)
if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
logger.error(
error_msg = (
f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, "
"treating as fatal error"
)
await self.push_error(ErrorFrame(error=f"{self} Error in receive loop: {error}"))
await self.push_error(error_msg=error_msg, exception=error)
return False
else:
logger.info(
@@ -1283,7 +1284,7 @@ class GeminiLiveLLMService(LLMService):
self._completed_tool_calls = set()
self._disconnecting = False
except Exception as e:
logger.error(f"{self} error disconnecting: {e}")
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
async def _send_user_audio(self, frame):
"""Send user audio frame to Gemini Live API."""
@@ -1447,13 +1448,11 @@ class GeminiLiveLLMService(LLMService):
# Update bot responding state and send service start frame
# (AUDIO modality case)
await self._set_bot_is_responding(True)
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
self._bot_text_buffer += text
self._search_result_buffer += text # Also accumulate for grounding
frame = LLMTextFrame(text=text)
# Gemini Live text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
frame = LLMTextFrame(text=text, skip_tts=self._get_skip_tts())
await self.push_frame(frame)
# Check for grounding metadata in server content
@@ -1492,7 +1491,7 @@ class GeminiLiveLLMService(LLMService):
if not self._bot_is_responding:
await self._set_bot_is_responding(True)
await self.push_frame(TTSStartedFrame())
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
self._bot_audio_buffer.extend(audio)
frame = TTSAudioRawFrame(
@@ -1553,10 +1552,10 @@ class GeminiLiveLLMService(LLMService):
if not text:
# AUDIO modality case
await self.push_frame(TTSStoppedFrame())
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
else:
# TEXT modality case
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
@traced_stt
async def _handle_user_transcription(
@@ -1644,9 +1643,9 @@ class GeminiLiveLLMService(LLMService):
if not self._bot_is_responding:
await self._set_bot_is_responding(True)
await self.push_frame(TTSStartedFrame())
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
frame = TTSTextFrame(text=text)
frame = TTSTextFrame(text=text, aggregated_by=AggregationType.SENTENCE)
# Gemini Live text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
@@ -1724,6 +1723,8 @@ class GeminiLiveLLMService(LLMService):
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
cache_read_input_tokens=usage.cached_content_token_count,
reasoning_tokens=usage.thoughts_token_count,
)
await self.start_llm_usage_metrics(tokens)
@@ -1744,7 +1745,7 @@ class GeminiLiveLLMService(LLMService):
# state management, and that exponential backoff for retries can have
# cost/stability implications for a service cluster, let's just treat a
# send-side error as fatal.
await self.push_error(ErrorFrame(error=f"{self} Send error: {error}", fatal=True))
await self.push_error(error_msg=f"Send error: {error}")
def create_context_aggregator(
self,

View File

@@ -110,7 +110,6 @@ class GoogleImageGenService(ImageGenService):
await self.stop_ttfb_metrics()
if not response or not response.generated_images:
logger.error(f"{self} error: image generation failed")
yield ErrorFrame("Image generation failed")
return
@@ -128,5 +127,4 @@ class GoogleImageGenService(ImageGenService):
yield frame
except Exception as e:
logger.error(f"{self} error generating image: {e}")
yield ErrorFrame(f"Image generation error: {str(e)}")

View File

@@ -793,7 +793,7 @@ class GoogleLLMService(LLMService):
return
generation_params.setdefault("thinking_config", {})["thinking_budget"] = 0
except Exception as e:
logger.exception(f"Failed to unset thinking budget: {e}")
logger.error(f"Failed to unset thinking budget: {e}")
async def _stream_content(
self, params_from_context: GeminiLLMInvocationParams
@@ -876,7 +876,7 @@ class GoogleLLMService(LLMService):
@traced_llm
async def _process_context(self, context: OpenAILLMContext | LLMContext):
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
prompt_tokens = 0
completion_tokens = 0
@@ -920,9 +920,9 @@ class GoogleLLMService(LLMService):
for part in candidate.content.parts:
if not part.thought and part.text:
search_result += part.text
frame = LLMTextFrame(part.text)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self.push_frame(
LLMTextFrame(part.text, skip_tts=self._get_skip_tts())
)
elif part.function_call:
function_call = part.function_call
id = function_call.id or str(uuid.uuid4())
@@ -985,7 +985,7 @@ class GoogleLLMService(LLMService):
except DeadlineExceeded:
await self._call_event_handler("on_completion_timeout")
except Exception as e:
logger.exception(f"{self} exception: {e}")
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
if grounding_metadata and isinstance(grounding_metadata, dict):
llm_search_frame = LLMSearchResponseFrame(
@@ -1004,7 +1004,7 @@ class GoogleLLMService(LLMService):
reasoning_tokens=reasoning_tokens,
)
)
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle different frame types.

View File

@@ -136,7 +136,9 @@ class GoogleLLMOpenAIBetaService(OpenAILLMService):
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.content))
await self.push_frame(
LLMTextFrame(chunk.choices[0].delta.content, skip_tts=self._get_skip_tts())
)
# if we got a function name and arguments, check to see if it's a function with
# a registered handler. If so, run the registered callback, save the result to

View File

@@ -774,8 +774,7 @@ class GoogleSTTService(STTService):
yield cloud_speech.StreamingRecognizeRequest(audio=audio_data)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
raise
async def _stream_audio(self):
@@ -806,15 +805,13 @@ class GoogleSTTService(STTService):
break
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await asyncio.sleep(1) # Brief delay before reconnecting
self._stream_start_time = int(time.time() * 1000)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Process an audio chunk for STT transcription.
@@ -902,8 +899,7 @@ class GoogleSTTService(STTService):
)
raise
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
# Re-raise the exception to let it propagate (e.g. in the case of a
# timeout, propagate to _stream_audio to reconnect)
raise

View File

@@ -596,15 +596,6 @@ class GoogleHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Google TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Google's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Google TTS language format.
@@ -746,7 +737,6 @@ class GoogleHttpTTSService(TTSService):
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} exception: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
@@ -803,15 +793,6 @@ class GoogleBaseTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Google and Gemini TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Google's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Google TTS language format.
@@ -1014,9 +995,7 @@ class GoogleTTSService(GoogleBaseTTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
error_message = f"TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)
await self.push_error(error_msg=f"TTS generation error: {str(e)}", exception=e)
class GeminiTTSService(GoogleBaseTTSService):
@@ -1266,6 +1245,5 @@ class GeminiTTSService(GoogleBaseTTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
error_message = f"Gemini TTS generation error: {str(e)}"
yield ErrorFrame(error=error_message)

View File

@@ -123,6 +123,8 @@ class GrokLLMService(OpenAILLMService):
self._prompt_tokens = 0
self._completion_tokens = 0
self._total_tokens = 0
self._cache_read_input_tokens = None
self._reasoning_tokens = None
self._has_reported_prompt_tokens = False
self._is_processing = True
@@ -137,6 +139,8 @@ class GrokLLMService(OpenAILLMService):
prompt_tokens=self._prompt_tokens,
completion_tokens=self._completion_tokens,
total_tokens=self._total_tokens,
cache_read_input_tokens=self._cache_read_input_tokens,
reasoning_tokens=self._reasoning_tokens,
)
await super().start_llm_usage_metrics(tokens)
@@ -149,7 +153,7 @@ class GrokLLMService(OpenAILLMService):
Args:
tokens: The token usage metrics for the current chunk of processing,
containing prompt_tokens and completion_tokens counts.
containing prompt_tokens, completion_tokens, and optional cached/reasoning tokens.
"""
# Only accumulate metrics during active processing
if not self._is_processing:
@@ -164,6 +168,13 @@ class GrokLLMService(OpenAILLMService):
if tokens.completion_tokens > self._completion_tokens:
self._completion_tokens = tokens.completion_tokens
# Capture cached & reasoning tokens (these typically only appear once per request)
if tokens.cache_read_input_tokens is not None:
self._cache_read_input_tokens = tokens.cache_read_input_tokens
if tokens.reasoning_tokens is not None:
self._reasoning_tokens = tokens.reasoning_tokens
def create_context_aggregator(
self,
context: OpenAILLMContext,

View File

@@ -111,15 +111,6 @@ class GroqTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Groq TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Groq's text frames include necessary inter-frame spaces.
"""
return True
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Groq's TTS API.
@@ -155,7 +146,6 @@ class GroqTTSService(TTSService):
bytes = w.readframes(num_frames)
yield TTSAudioRawFrame(bytes, frame_rate, channels)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()

View File

@@ -179,7 +179,7 @@ class HeyGenClient:
await self._task_manager.cancel_task(self._event_task)
self._event_task = None
except Exception as e:
logger.exception(f"Exception during cleanup: {e}")
logger.error(f"Exception during cleanup: {e}")
async def start(self, frame: StartFrame, audio_chunk_size: int) -> None:
"""Start the client and establish all necessary connections.

View File

@@ -14,12 +14,14 @@ from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.tts_service import TTSService
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import WordTTSService
from pipecat.utils.tracing.service_decorators import traced_tts
try:
@@ -29,6 +31,7 @@ try:
PostedUtterance,
PostedUtteranceVoiceWithId,
)
from hume.tts.types import TimestampMessage
except ModuleNotFoundError as e: # pragma: no cover - import-time guidance
logger.error(f"Exception: {e}")
logger.error("In order to use Hume, you need to `pip install pipecat-ai[hume]`.")
@@ -38,7 +41,7 @@ except ModuleNotFoundError as e: # pragma: no cover - import-time guidance
HUME_SAMPLE_RATE = 48_000 # Hume TTS streams at 48 kHz
class HumeTTSService(TTSService):
class HumeTTSService(WordTTSService):
"""Hume Octave Text-to-Speech service.
Streams PCM audio via Hume's HTTP output streaming (JSON chunks) endpoint
@@ -48,6 +51,7 @@ class HumeTTSService(TTSService):
- Generates speech from text using Hume TTS.
- Streams PCM audio.
- Supports word-level timestamps for precise audio-text synchronization.
- Supports dynamic updates of voice and synthesis parameters at runtime.
- Provides metrics for Time To First Byte (TTFB) and TTS usage.
"""
@@ -92,7 +96,13 @@ class HumeTTSService(TTSService):
f"Hume TTS streams at {HUME_SAMPLE_RATE} Hz; configured sample_rate={sample_rate}"
)
super().__init__(sample_rate=sample_rate, **kwargs)
# WordTTSService sets push_text_frames=False by default, which we want
super().__init__(
sample_rate=sample_rate,
push_text_frames=False,
push_stop_frames=True,
**kwargs,
)
self._client = AsyncHumeClient(api_key=api_key)
self._params = params or HumeTTSService.InputParams()
@@ -102,6 +112,10 @@ class HumeTTSService(TTSService):
self._audio_bytes = b""
# Track cumulative time for word timestamps across utterances
self._cumulative_time = 0.0
self._started = False
def can_generate_metrics(self) -> bool:
"""Can generate metrics.
@@ -110,15 +124,6 @@ class HumeTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Hume TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Hume's text frames include necessary inter-frame spaces.
"""
return True
async def start(self, frame: StartFrame) -> None:
"""Start the service.
@@ -126,6 +131,27 @@ class HumeTTSService(TTSService):
frame: The start frame.
"""
await super().start(frame)
self._reset_state()
def _reset_state(self):
"""Reset internal state variables."""
self._cumulative_time = 0.0
self._started = False
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push a frame and handle state changes.
Args:
frame: The frame to push.
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (InterruptionFrame, TTSStoppedFrame)):
# Reset timing on interruption or stop
self._reset_state()
if isinstance(frame, TTSStoppedFrame):
await self.add_word_timestamps([("Reset", 0)])
async def update_setting(self, key: str, value: Any) -> None:
"""Runtime updates via `TTSUpdateSettingsFrame`.
@@ -142,7 +168,7 @@ class HumeTTSService(TTSService):
if key_l == "voice_id":
self.set_voice(str(value))
logger.info(f"HumeTTSService voice_id set to: {self.voice}")
logger.debug(f"HumeTTSService voice_id set to: {self.voice}")
elif key_l == "description":
self._params.description = None if value is None else str(value)
elif key_l == "speed":
@@ -155,7 +181,7 @@ class HumeTTSService(TTSService):
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Hume TTS.
"""Generate speech from text using Hume TTS with word timestamps.
Args:
text: The text to be synthesized.
@@ -186,7 +212,12 @@ class HumeTTSService(TTSService):
await self.start_ttfb_metrics()
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
# Start TTS sequence if not already started
if not self._started:
self.start_word_timestamps()
yield TTSStartedFrame()
self._started = True
try:
# Instant mode is always enabled here (not user-configurable)
@@ -197,23 +228,50 @@ class HumeTTSService(TTSService):
# Use version "2" by default if no description is provided
# Version "1" is needed when description is used
version = "1" if self._params.description is not None else "2"
# Track the duration of this utterance based on the last timestamp
utterance_duration = 0.0
async for chunk in self._client.tts.synthesize_json_streaming(
utterances=[utterance],
format=pcm_fmt,
instant_mode=True,
version=version,
include_timestamp_types=["word"], # Request word-level timestamps
):
# Process audio chunks
audio_b64 = getattr(chunk, "audio", None)
if not audio_b64:
continue
if audio_b64:
await self.stop_ttfb_metrics()
pcm_bytes = base64.b64decode(audio_b64)
self._audio_bytes += pcm_bytes
pcm_bytes = base64.b64decode(audio_b64)
self._audio_bytes += pcm_bytes
# Buffer audio until we have enough to avoid glitches
if len(self._audio_bytes) >= self.chunk_size:
frame = TTSAudioRawFrame(
audio=self._audio_bytes,
sample_rate=self.sample_rate,
num_channels=1,
)
yield frame
self._audio_bytes = b""
# Buffer audio until we have enough to avoid glitches
if len(self._audio_bytes) < self.chunk_size:
continue
# Process timestamp messages
if isinstance(chunk, TimestampMessage):
timestamp = chunk.timestamp
if timestamp.type == "word":
# Convert milliseconds to seconds and add cumulative offset
word_start_time = self._cumulative_time + (timestamp.time.begin / 1000.0)
word_end_time = self._cumulative_time + (timestamp.time.end / 1000.0)
# Track the maximum end time for this utterance
utterance_duration = max(utterance_duration, word_end_time)
# Add word timestamp
await self.add_word_timestamps([(timestamp.text, word_start_time)])
# Flush any remaining audio bytes
if self._audio_bytes:
frame = TTSAudioRawFrame(
audio=self._audio_bytes,
sample_rate=self.sample_rate,
@@ -224,10 +282,13 @@ class HumeTTSService(TTSService):
self._audio_bytes = b""
# Update cumulative time for next utterance
if utterance_duration > 0:
self._cumulative_time = utterance_duration
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
# Ensure TTFB timer is stopped even on early failures
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()
# Let the parent class handle TTSStoppedFrame via push_stop_frames

View File

@@ -146,6 +146,8 @@ class InworldTTSService(TTSService):
Parameters:
temperature: Voice temperature control for synthesis variability (e.g., 1.1).
Valid range: [0, 2]. Higher values increase variability.
speaking_rate: Speaking speed control (range: [0.5, 1.5]). Defaults to 1.0 when
unset.
Note:
Language is automatically inferred from the input text by Inworld's TTS models,
@@ -153,6 +155,7 @@ class InworldTTSService(TTSService):
"""
temperature: Optional[float] = None # optional temperature control (range: [0, 2])
speaking_rate: Optional[float] = None # optional speaking rate control (range: [0.5, 1.5])
def __init__(
self,
@@ -198,6 +201,7 @@ class InworldTTSService(TTSService):
- Other formats as supported by Inworld API
params: Optional input parameters for additional configuration. Use this to specify:
- temperature: Voice temperature control for variability (range: [0, 2], e.g., 1.1, optional)
- speaking_rate: Set desired speaking speed (range: [0.5, 1.5], optional)
Language is automatically inferred from input text.
**kwargs: Additional arguments passed to the parent TTSService class.
@@ -228,15 +232,18 @@ class InworldTTSService(TTSService):
self._settings = {
"voiceId": voice_id, # Voice selection from direct parameter
"modelId": model, # TTS model selection from direct parameter
"audio_config": { # Audio format configuration
"audio_encoding": encoding, # Format: LINEAR16, MP3, etc.
"sample_rate_hertz": 0, # Will be set in start() from parent service
"audioConfig": { # Audio format configuration
"audioEncoding": encoding, # Format: LINEAR16, MP3, etc.
"sampleRateHertz": 0, # Will be set in start() from parent service
},
}
# Add optional temperature parameter if provided (valid range: [0, 2])
if params and params.temperature is not None:
self._settings["temperature"] = params.temperature
# Add optional speaking rate if provided (valid range: [0.5, 1.5])
if params and params.speaking_rate is not None:
self._settings["audioConfig"]["speakingRate"] = params.speaking_rate
# Register voice and model with parent service for metrics and tracking
self.set_voice(voice_id) # Used for logging and metrics
@@ -250,15 +257,6 @@ class InworldTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Inworld TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Inworld's text frames include necessary inter-frame spaces.
"""
return True
async def start(self, frame: StartFrame):
"""Start the Inworld TTS service.
@@ -266,7 +264,7 @@ class InworldTTSService(TTSService):
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._settings["audio_config"]["sample_rate_hertz"] = self.sample_rate
self._settings["audioConfig"]["sampleRateHertz"] = self.sample_rate
async def stop(self, frame: EndFrame):
"""Stop the Inworld TTS service.
@@ -332,9 +330,7 @@ class InworldTTSService(TTSService):
"text": text, # Text to synthesize
"voiceId": self._settings["voiceId"], # Voice selection (Ashley, Hades, etc.)
"modelId": self._settings["modelId"], # TTS model (inworld-tts-1)
"audio_config": self._settings[
"audio_config"
], # Audio format settings (LINEAR16, 48kHz)
"audioConfig": self._settings["audioConfig"], # Audio format settings (LINEAR16, 48kHz)
}
# Add optional temperature parameter if configured (valid range: [0, 2])
@@ -401,8 +397,7 @@ class InworldTTSService(TTSService):
# STEP 7: ERROR HANDLING
# ================================================================================
# Log any unexpected errors and notify the pipeline
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
# ================================================================================
# STEP 8: CLEANUP AND COMPLETION
@@ -517,7 +512,7 @@ class InworldTTSService(TTSService):
# Extract the base64-encoded audio content from response
if "audioContent" not in response_data:
logger.error("No audioContent in Inworld API response")
await self.push_error(ErrorFrame(error="No audioContent in response"))
yield ErrorFrame(error="No audioContent in response")
return
# ================================================================================

View File

@@ -9,17 +9,7 @@
import asyncio
import inspect
from dataclasses import dataclass
from typing import (
Any,
Awaitable,
Callable,
Dict,
Mapping,
Optional,
Protocol,
Sequence,
Type,
)
from typing import Any, Awaitable, Callable, Dict, Mapping, Optional, Protocol, Sequence, Type
from loguru import logger
@@ -285,17 +275,13 @@ class LLMService(AIService):
elif isinstance(frame, LLMConfigureOutputFrame):
self._skip_tts = frame.skip_tts
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Pushes a frame.
def _get_skip_tts(self) -> bool:
"""Get the current skip_tts configuration.
Args:
frame: The frame to push.
direction: The direction of frame pushing.
Returns:
The current skip_tts setting for frames generated by this LLM.
"""
if isinstance(frame, (LLMTextFrame, LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
frame.skip_tts = self._skip_tts
await super().push_frame(frame, direction)
return self._skip_tts
async def _handle_interruptions(self, _: InterruptionFrame):
for function_name, entry in self._functions.items():

View File

@@ -124,15 +124,6 @@ class LmntTTSService(InterruptibleTTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that LMNT TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that LMNT's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to LMNT service language format.
@@ -223,8 +214,7 @@ class LmntTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -240,8 +230,7 @@ class LmntTTSService(InterruptibleTTSService):
# await self._websocket.send(json.dumps({"eof": True}))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error disconnecting from LMNT: {e}", exception=e)
finally:
self._started = False
self._websocket = None
@@ -275,10 +264,9 @@ class LmntTTSService(InterruptibleTTSService):
try:
msg = json.loads(message)
if "error" in msg:
logger.error(f"{self} error: {msg['error']}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
await self.push_error(error_msg=f"Error: {msg['error']}")
return
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
@@ -311,13 +299,11 @@ class LmntTTSService(InterruptibleTTSService):
await self._get_websocket().send(json.dumps({"flush": True}))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")

View File

@@ -176,7 +176,6 @@ class MCPClient(BaseObject):
except Exception as e:
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
logger.error(error_msg)
logger.exception("Full exception details:")
await params.result_callback(error_msg)
async def _stdio_list_tools(self) -> ToolsSchema:
@@ -207,7 +206,6 @@ class MCPClient(BaseObject):
except Exception as e:
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
logger.error(error_msg)
logger.exception("Full exception details:")
await params.result_callback(error_msg)
async def _streamable_http_list_tools(self) -> ToolsSchema:
@@ -246,7 +244,6 @@ class MCPClient(BaseObject):
except Exception as e:
error_msg = f"Error calling mcp tool {params.function_name}: {str(e)}"
logger.error(error_msg)
logger.exception("Full exception details:")
await params.result_callback(error_msg)
async def _call_tool(self, session, function_name, arguments, result_callback):
@@ -302,7 +299,6 @@ class MCPClient(BaseObject):
except Exception as e:
logger.error(f"Failed to read tool '{tool_name}': {str(e)}")
logger.exception("Full exception details:")
continue
logger.debug(f"Completed reading {len(tool_schemas)} tools")

View File

@@ -253,8 +253,9 @@ class Mem0MemoryService(FrameProcessor):
# Otherwise, pass the enhanced context frame downstream
await self.push_frame(frame)
except Exception as e:
logger.error(f"Error processing with Mem0: {str(e)}")
await self.push_frame(ErrorFrame(f"Error processing with Mem0: {str(e)}"))
await self.push_error(
error_msg=f"Error processing with Mem0: {str(e)}", exception=e
)
await self.push_frame(frame) # Still pass the original frame through
else:
# For non-context frames, just pass them through

View File

@@ -40,24 +40,40 @@ def language_to_minimax_language(language: Language) -> Optional[str]:
The corresponding MiniMax language name, or None if not supported.
"""
LANGUAGE_MAP = {
Language.AF: "Afrikaans",
Language.AR: "Arabic",
Language.BG: "Bulgarian",
Language.CA: "Catalan",
Language.CS: "Czech",
Language.DA: "Danish",
Language.DE: "German",
Language.EL: "Greek",
Language.EN: "English",
Language.ES: "Spanish",
Language.FA: "Persian", # ⚠️ Only supported by speech-2.6-* models
Language.FI: "Finnish",
Language.FIL: "Filipino", # ⚠️ Only supported by speech-2.6-* models
Language.FR: "French",
Language.HE: "Hebrew",
Language.HI: "Hindi",
Language.HR: "Croatian",
Language.HU: "Hungarian",
Language.ID: "Indonesian",
Language.IT: "Italian",
Language.JA: "Japanese",
Language.KO: "Korean",
Language.MS: "Malay",
Language.NB: "Norwegian",
Language.NN: "Nynorsk",
Language.NL: "Dutch",
Language.PL: "Polish",
Language.PT: "Portuguese",
Language.RO: "Romanian",
Language.RU: "Russian",
Language.SK: "Slovak",
Language.SL: "Slovenian",
Language.SV: "Swedish",
Language.TA: "Tamil", # ⚠️ Only supported by speech-2.6-* models
Language.TH: "Thai",
Language.TR: "Turkish",
Language.UK: "Ukrainian",
@@ -84,13 +100,22 @@ class MiniMaxHttpTTSService(TTSService):
"""Configuration parameters for MiniMax TTS.
Parameters:
language: Language for TTS generation.
language: Language for TTS generation. Supports 40 languages.
Note: Filipino, Tamil, and Persian require speech-2.6-* models.
speed: Speech speed (range: 0.5 to 2.0).
volume: Speech volume (range: 0 to 10).
pitch: Pitch adjustment (range: -12 to 12).
emotion: Emotional tone (options: "happy", "sad", "angry", "fearful",
"disgusted", "surprised", "neutral").
english_normalization: Whether to apply English text normalization.
"disgusted", "surprised", "calm", "fluent").
english_normalization: Deprecated; use `text_normalization` instead
.. deprecated:: 0.0.96
The `english_normalization` parameter is deprecated and will be removed in a future version.
Use the `text_normalization` parameter instead.
text_normalization: Enable text normalization (Chinese/English).
latex_read: Enable LaTeX formula reading.
exclude_aggregated_audio: Whether to exclude aggregated audio in final chunk.
"""
language: Optional[Language] = Language.EN
@@ -98,7 +123,10 @@ class MiniMaxHttpTTSService(TTSService):
volume: Optional[float] = 1.0
pitch: Optional[int] = 0
emotion: Optional[str] = None
english_normalization: Optional[bool] = None
english_normalization: Optional[bool] = None # Deprecated
text_normalization: Optional[bool] = None
latex_read: Optional[bool] = None
exclude_aggregated_audio: Optional[bool] = None
def __init__(
self,
@@ -120,9 +148,12 @@ class MiniMaxHttpTTSService(TTSService):
base_url: API base URL, defaults to MiniMax's T2A endpoint.
Global: https://api.minimax.io/v1/t2a_v2
Mainland China: https://api.minimaxi.chat/v1/t2a_v2
Western United States: https://api-uw.minimax.io/v1/t2a_v2
group_id: MiniMax Group ID to identify project.
model: TTS model name. Defaults to "speech-02-turbo". Options include
"speech-02-hd", "speech-02-turbo", "speech-01-hd", "speech-01-turbo".
model: TTS model name. Defaults to "speech-02-turbo". Options include:
"speech-2.6-hd", "speech-2.6-turbo" (latest, supports Filipino/Tamil/Persian),
"speech-02-hd", "speech-02-turbo",
"speech-01-hd", "speech-01-turbo".
voice_id: Voice identifier. Defaults to "Calm_Woman".
aiohttp_session: aiohttp.ClientSession for API communication.
sample_rate: Output audio sample rate in Hz. If None, uses pipeline default.
@@ -176,15 +207,34 @@ class MiniMaxHttpTTSService(TTSService):
"disgusted",
"surprised",
"neutral",
"fluent",
]
if params.emotion in supported_emotions:
self._settings["voice_setting"]["emotion"] = params.emotion
else:
logger.warning(f"Unsupported emotion: {params.emotion}. Using default.")
logger.warning(
f"Unsupported emotion: {params.emotion}. Supported emotions: {supported_emotions}"
)
# Add english_normalization if provided
# If `english_normalization`, add `text_normalization` and print warning
if params.english_normalization is not None:
self._settings["english_normalization"] = params.english_normalization
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Parameter `english_normalization` is deprecated and will be removed in a future version. Use `text_normalization` instead.",
DeprecationWarning,
)
self._settings["voice_setting"]["text_normalization"] = params.english_normalization
# Add text_normalization if provided (corrected parameter name)
if params.text_normalization is not None:
self._settings["voice_setting"]["text_normalization"] = params.text_normalization
# Add latex_read if provided
if params.latex_read is not None:
self._settings["voice_setting"]["latex_read"] = params.latex_read
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -194,15 +244,6 @@ class MiniMaxHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that MiniMax TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that MiniMax's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to MiniMax service language format.
@@ -240,7 +281,7 @@ class MiniMaxHttpTTSService(TTSService):
"""
await super().start(frame)
self._settings["audio_setting"]["sample_rate"] = self.sample_rate
logger.debug(f"MiniMax TTS initialized with sample rate: {self.sample_rate}")
logger.debug(f"MiniMax TTS initialized with sample_rate: {self.sample_rate}")
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
@@ -273,7 +314,6 @@ class MiniMaxHttpTTSService(TTSService):
) as response:
if response.status != 200:
error_message = f"MiniMax TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
@@ -339,16 +379,19 @@ class MiniMaxHttpTTSService(TTSService):
num_channels=1,
)
except ValueError as e:
logger.error(f"Error converting hex to binary: {e}")
logger.error(
f"Error converting hex to binary: {e}",
)
continue
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON: {e}, data: {data_block[:100]}")
logger.error(
f"Error decoding JSON: {e}, data: {data_block[:100]}",
)
continue
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}", exception=e)
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -110,7 +110,6 @@ class MoondreamService(VisionService):
if analysis fails.
"""
if not self._model:
logger.error(f"{self} error: Moondream model not available ({self.model_name})")
yield ErrorFrame("Moondream model not available")
return

View File

@@ -151,15 +151,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Neuphonic TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Neuphonic's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Neuphonic service language format.
@@ -294,8 +285,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -308,8 +298,7 @@ class NeuphonicTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Neuphonic")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
self._started = False
self._websocket = None
@@ -374,16 +363,14 @@ class NeuphonicTTSService(InterruptibleTTSService):
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
class NeuphonicHttpTTSService(TTSService):
@@ -449,15 +436,6 @@ class NeuphonicHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Neuphonic TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Neuphonic's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Neuphonic service language format.
@@ -556,7 +534,6 @@ class NeuphonicHttpTTSService(TTSService):
if response.status != 200:
error_text = await response.text()
error_message = f"Neuphonic API error: HTTP {response.status} - {error_text}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
@@ -586,8 +563,7 @@ class NeuphonicHttpTTSService(TTSService):
yield TTSAudioRawFrame(audio_bytes, self.sample_rate, 1)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
# Don't yield error frame for individual message failures
continue
@@ -595,8 +571,7 @@ class NeuphonicHttpTTSService(TTSService):
logger.debug("TTS generation cancelled")
raise
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -13,13 +13,7 @@ from typing import Any, Dict, List, Mapping, Optional
import httpx
from loguru import logger
from openai import (
NOT_GIVEN,
APITimeoutError,
AsyncOpenAI,
AsyncStream,
DefaultAsyncHttpxClient,
)
from openai import NOT_GIVEN, APITimeoutError, AsyncOpenAI, AsyncStream, DefaultAsyncHttpxClient
from openai.types.chat import ChatCompletionChunk, ChatCompletionMessageParam
from pydantic import BaseModel, Field
@@ -346,11 +340,17 @@ class BaseOpenAILLMService(LLMService):
if chunk.usage.prompt_tokens_details
else None
)
reasoning_tokens = (
chunk.usage.completion_tokens_details.reasoning_tokens
if chunk.usage.completion_tokens_details
else None
)
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,
completion_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens,
cache_read_input_tokens=cached_tokens,
reasoning_tokens=reasoning_tokens,
)
await self.start_llm_usage_metrics(tokens)
@@ -390,16 +390,20 @@ class BaseOpenAILLMService(LLMService):
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
frame = LLMTextFrame(chunk.choices[0].delta.content)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self.push_frame(
LLMTextFrame(chunk.choices[0].delta.content, skip_tts=self._get_skip_tts())
)
# When gpt-4o-audio / gpt-4o-mini-audio is used for llm or stt+llm
# we need to get LLMTextFrame for the transcript
elif hasattr(chunk.choices[0].delta, "audio") and chunk.choices[0].delta.audio.get(
"transcript"
):
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.audio["transcript"]))
await self.push_frame(
LLMTextFrame(
chunk.choices[0].delta.audio["transcript"], skip_tts=self._get_skip_tts()
)
)
# if we got a function name and arguments, check to see if it's a function with
# a registered handler. If so, run the registered callback, save the result to
@@ -459,11 +463,11 @@ class BaseOpenAILLMService(LLMService):
if context:
try:
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.start_processing_metrics()
await self._process_context(context)
except httpx.TimeoutException:
await self._call_event_handler("on_completion_timeout")
finally:
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))

View File

@@ -76,7 +76,6 @@ class OpenAIImageGenService(ImageGenService):
image_url = image.data[0].url
if not image_url:
logger.error(f"{self} No image provided in response: {image}")
yield ErrorFrame("Image generation failed")
return

View File

@@ -15,10 +15,9 @@ from typing import Optional
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.services.open_ai_realtime_adapter import (
OpenAIRealtimeLLMAdapter,
)
from pipecat.adapters.services.open_ai_realtime_adapter import OpenAIRealtimeLLMAdapter
from pipecat.frames.frames import (
AggregationType,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
@@ -56,7 +55,6 @@ from pipecat.processors.aggregators.openai_llm_context import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
from pipecat.services.openai.llm import OpenAIContextAggregatorPair
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_openai_realtime, traced_stt
@@ -284,7 +282,7 @@ class OpenAIRealtimeLLMService(LLMService):
await self._truncate_current_audio_response()
await self.stop_all_metrics()
if self._current_assistant_response:
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
# Only push TTSStoppedFrame if audio modality is enabled
if self._is_modality_enabled("audio"):
await self.push_frame(TTSStoppedFrame())
@@ -443,7 +441,7 @@ class OpenAIRealtimeLLMService(LLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None
async def _disconnect(self):
@@ -460,7 +458,7 @@ class OpenAIRealtimeLLMService(LLMService):
self._completed_tool_calls = set()
self._disconnecting = False
except Exception as e:
logger.error(f"{self} error disconnecting: {e}")
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
async def _ws_send(self, realtime_message):
try:
@@ -473,12 +471,11 @@ class OpenAIRealtimeLLMService(LLMService):
# somehow *started* the websocket send attempt while we still
# had a connection)
return
logger.error(f"Error sending message to websocket: {e}")
# In server-to-server contexts, a WebSocket error should be quite rare. Given how hard
# it is to recover from a send-side error with proper state management, and that exponential
# backoff for retries can have cost/stability implications for a service cluster, let's just
# treat a send-side error as fatal.
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
await self.push_error(error_msg=f"Error sending client event: {e}", exception=e)
async def _update_settings(self):
settings = self._session_properties
@@ -609,7 +606,7 @@ class OpenAIRealtimeLLMService(LLMService):
if evt.item.role == "assistant":
self._current_assistant_response = evt.item
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
async def _handle_evt_conversation_item_done(self, evt):
"""Handle conversation.item.done event - item is fully completed."""
@@ -656,18 +653,25 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_response_done(self, evt):
# todo: figure out whether there's anything we need to do for "cancelled" events
# usage metrics
cached_tokens = (
evt.response.usage.input_token_details.cached_tokens
if hasattr(evt.response.usage, "input_token_details")
and evt.response.usage.input_token_details
else None
)
tokens = LLMTokenUsage(
prompt_tokens=evt.response.usage.input_tokens,
completion_tokens=evt.response.usage.output_tokens,
total_tokens=evt.response.usage.total_tokens,
cache_read_input_tokens=cached_tokens,
)
await self.start_llm_usage_metrics(tokens)
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
self._current_assistant_response = None
# error handling
if evt.response.status == "failed":
await self.push_error(ErrorFrame(error=evt.response.status_details["error"]["message"]))
await self.push_error(error_msg=evt.response.status_details["error"]["message"])
return
# response content
for item in evt.response.output:
@@ -677,16 +681,14 @@ class OpenAIRealtimeLLMService(LLMService):
# We receive text deltas (as opposed to audio transcript deltas) when
# the output modality is "text"
if evt.delta:
frame = LLMTextFrame(evt.delta)
# OpenAI Realtime text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
frame = LLMTextFrame(evt.delta, skip_tts=self._get_skip_tts())
await self.push_frame(frame)
async def _handle_evt_audio_transcript_delta(self, evt):
# We receive audio transcript deltas (as opposed to text deltas) when
# the output modality is "audio" (the default)
if evt.delta:
frame = TTSTextFrame(evt.delta)
frame = TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE)
# OpenAI Realtime text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
@@ -761,7 +763,7 @@ class OpenAIRealtimeLLMService(LLMService):
async def _handle_evt_error(self, evt):
# Errors are fatal to this connection. Send an ErrorFrame.
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
await self.push_error(error_msg=f"Error: {evt}")
#
# state and client events for the current conversation
@@ -811,9 +813,9 @@ class OpenAIRealtimeLLMService(LLMService):
# We're done configuring the LLM for this session
self._llm_needs_conversation_setup = False
logger.debug(f"Creating response")
logger.debug("Creating response")
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.start_processing_metrics()
await self.start_ttfb_metrics()
await self.send_client_event(

View File

@@ -131,15 +131,6 @@ class OpenAITTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that OpenAI TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that OpenAI's text frames include necessary inter-frame spaces.
"""
return True
async def set_model(self, model: str):
"""Set the TTS model to use.
@@ -215,5 +206,4 @@ class OpenAITTSService(TTSService):
yield frame
yield TTSStoppedFrame()
except BadRequestError as e:
logger.exception(f"{self} error generating TTS: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")

View File

@@ -79,5 +79,5 @@ class AzureRealtimeBetaLLMService(OpenAIRealtimeBetaLLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None

View File

@@ -17,6 +17,7 @@ from loguru import logger
from pipecat.adapters.services.open_ai_realtime_adapter import OpenAIRealtimeLLMAdapter
from pipecat.frames.frames import (
AggregationType,
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
@@ -264,7 +265,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
await self._truncate_current_audio_response()
await self.stop_all_metrics()
if self._current_assistant_response:
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
# Only push TTSStoppedFrame if audio modality is enabled
if self._is_modality_enabled("audio"):
await self.push_frame(TTSStoppedFrame())
@@ -424,7 +425,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None
async def _disconnect(self):
@@ -440,7 +441,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
self._receive_task = None
self._disconnecting = False
except Exception as e:
logger.error(f"{self} error disconnecting: {e}")
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
async def _ws_send(self, realtime_message):
try:
@@ -449,12 +450,11 @@ class OpenAIRealtimeBetaLLMService(LLMService):
except Exception as e:
if self._disconnecting:
return
logger.error(f"Error sending message to websocket: {e}")
# In server-to-server contexts, a WebSocket error should be quite rare. Given how hard
# it is to recover from a send-side error with proper state management, and that exponential
# backoff for retries can have cost/stability implications for a service cluster, let's just
# treat a send-side error as fatal.
await self.push_error(ErrorFrame(error=f"Error sending client event: {e}"))
await self.push_error(error_msg=f"Error sending client event: {e}", exception=e)
async def _update_settings(self):
settings = self._session_properties
@@ -564,7 +564,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
self._user_and_response_message_tuple = (evt.item, {"done": False, "output": []})
elif evt.item.role == "assistant":
self._current_assistant_response = evt.item
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
async def _handle_evt_input_audio_transcription_delta(self, evt):
if self._send_transcription_frames:
@@ -623,7 +623,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
)
await self.start_llm_usage_metrics(tokens)
await self.stop_processing_metrics()
await self.push_frame(LLMFullResponseEndFrame())
await self.push_frame(LLMFullResponseEndFrame(skip_tts=self._get_skip_tts()))
self._current_assistant_response = None
# error handling
if evt.response.status == "failed":
@@ -647,12 +647,12 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_text_delta(self, evt):
if evt.delta:
await self.push_frame(LLMTextFrame(evt.delta))
await self.push_frame(LLMTextFrame(evt.delta, skip_tts=self._get_skip_tts()))
async def _handle_evt_audio_transcript_delta(self, evt):
if evt.delta:
await self.push_frame(LLMTextFrame(evt.delta))
await self.push_frame(TTSTextFrame(evt.delta))
await self.push_frame(LLMTextFrame(evt.delta, skip_tts=self._get_skip_tts()))
await self.push_frame(TTSTextFrame(evt.delta, aggregated_by=AggregationType.SENTENCE))
async def _handle_evt_speech_started(self, evt):
await self._truncate_current_audio_response()
@@ -685,7 +685,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
async def _handle_evt_error(self, evt):
# Errors are fatal to this connection. Send an ErrorFrame.
await self.push_error(ErrorFrame(error=f"Error: {evt}"))
await self.push_error(error_msg=f"Error: {evt}")
async def _handle_assistant_output(self, output):
# We haven't seen intermixed audio and function_call items in the same response. But let's
@@ -747,7 +747,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
logger.debug(f"Creating response: {self._context.get_messages_for_logging()}")
await self.push_frame(LLMFullResponseStartFrame())
await self.push_frame(LLMFullResponseStartFrame(skip_tts=self._get_skip_tts()))
await self.start_processing_metrics()
await self.start_ttfb_metrics()
await self.send_client_event(

View File

@@ -66,15 +66,6 @@ class PiperTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Piper TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Piper's text frames include necessary inter-frame spaces.
"""
return True
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Piper's HTTP API.
@@ -97,9 +88,6 @@ class PiperTTSService(TTSService):
) as response:
if response.status != 200:
error = await response.text()
logger.error(
f"{self} error getting audio (status: {response.status}, error: {error})"
)
yield ErrorFrame(
error=f"Error getting audio (status: {response.status}, error: {error})"
)
@@ -118,7 +106,7 @@ class PiperTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
logger.debug(f"{self}: Finished TTS [{text}]")
await self.stop_ttfb_metrics()

View File

@@ -266,8 +266,7 @@ class PlayHTTTSService(InterruptibleTTSService):
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -280,8 +279,7 @@ class PlayHTTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from PlayHT")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
finally:
self._request_id = None
self._websocket = None
@@ -351,8 +349,7 @@ class PlayHTTTSService(InterruptibleTTSService):
await self.push_frame(TTSStoppedFrame())
self._request_id = None
elif "error" in msg:
logger.error(f"{self} error: {msg}")
await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}"))
await self.push_error(error_msg=f"Error: {msg['error']}")
except json.JSONDecodeError:
logger.error(f"Invalid JSON message: {message}")
@@ -394,8 +391,7 @@ class PlayHTTTSService(InterruptibleTTSService):
await self._get_websocket().send(json.dumps(tts_command))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
@@ -405,8 +401,7 @@ class PlayHTTTSService(InterruptibleTTSService):
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
class PlayHTHttpTTSService(TTSService):
@@ -626,8 +621,7 @@ class PlayHTHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -113,6 +113,10 @@ class RimeTTSService(AudioContextWordTTSService):
sample_rate: Audio sample rate in Hz.
params: Additional configuration parameters.
text_aggregator: Custom text aggregator for processing input text.
.. deprecated:: 0.0.95
Use an LLMTextProcessor before the TTSService for custom text aggregation.
aggregate_sentences: Whether to aggregate sentences within the TTSService.
**kwargs: Additional arguments passed to parent class.
"""
@@ -123,10 +127,17 @@ class RimeTTSService(AudioContextWordTTSService):
push_stop_frames=True,
pause_frame_processing=True,
sample_rate=sample_rate,
text_aggregator=text_aggregator or SkipTagsAggregator([("spell(", ")")]),
**kwargs,
)
if not text_aggregator:
# Always skip tags added for spelled-out text
# Note: This is primarily to support backwards compatibility.
# The preferred way of taking advantage of Rime spelling is
# to use an LLMTextProcessor and/or a text_transformer to identify
# and insert these tags for the purpose of the TTS service alone.
self._text_aggregator = SkipTagsAggregator([("spell(", ")")])
params = params or RimeTTSService.InputParams()
# Store service configuration
@@ -152,6 +163,7 @@ class RimeTTSService(AudioContextWordTTSService):
self._context_id = None # Tracks current turn
self._receive_task = None
self._cumulative_time = 0 # Accumulates time across messages
self._extra_msg_fields = {} # Extra fields for next message
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
@@ -181,6 +193,31 @@ class RimeTTSService(AudioContextWordTTSService):
self._model = model
await super().set_model(model)
# A set of Rime-specific helpers for text transformations
def SPELL(text: str) -> str:
"""Wrap text in Rime spell function."""
return f"spell({text})"
def PAUSE_TAG(seconds: float) -> str:
"""Convenience method to create a pause tag."""
return f"<{seconds * 1000}>"
def PRONOUNCE(self, text: str, word: str, phoneme: str) -> str:
"""Convenience method to support Rime's custom pronunciations feature.
https://docs.rime.ai/api-reference/custom-pronunciation
"""
self._extra_msg_fields["phonemizeBetweenBrackets"] = True
return text.replace(word, f"{phoneme}")
def INLINE_SPEED(self, text: str, speed: float) -> str:
"""Convenience method to support inline speeds."""
if not self._extra_msg_fields:
self._extra_msg_fields = {}
speed_vals = self._extra_msg_fields.get("inlineSpeedAlpha", "").split(",")
self._extra_msg_fields["inlineSpeedAlpha"] = ",".join(speed_vals + [str(speed)])
return f"[{text}]"
async def _update_settings(self, settings: Mapping[str, Any]):
"""Update service settings and reconnect if voice changed."""
prev_voice = self._voice_id
@@ -193,7 +230,11 @@ class RimeTTSService(AudioContextWordTTSService):
def _build_msg(self, text: str = "") -> dict:
"""Build JSON message for Rime API."""
return {"text": text, "contextId": self._context_id}
msg = {"text": text, "contextId": self._context_id}
if self._extra_msg_fields:
msg |= self._extra_msg_fields
self._extra_msg_fields = {}
return msg
def _build_clear_msg(self) -> dict:
"""Build clear operation message."""
@@ -259,8 +300,7 @@ class RimeTTSService(AudioContextWordTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error connecting: {e}", exception=e)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -272,8 +312,7 @@ class RimeTTSService(AudioContextWordTTSService):
await self._websocket.send(json.dumps(self._build_eos_msg()))
await self._websocket.close()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error disconnecting: {e}", exception=e)
finally:
self._context_id = None
self._websocket = None
@@ -366,10 +405,9 @@ class RimeTTSService(AudioContextWordTTSService):
logger.debug(f"Updated cumulative time to: {self._cumulative_time}")
elif msg["type"] == "error":
logger.error(f"{self} error: {msg}")
await self.push_frame(TTSStoppedFrame())
await self.stop_all_metrics()
await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}"))
await self.push_error(error_msg=f"Error: {msg['message']}")
self._context_id = None
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
@@ -411,16 +449,14 @@ class RimeTTSService(AudioContextWordTTSService):
await self._get_websocket().send(json.dumps(msg))
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
class RimeHttpTTSService(TTSService):
@@ -501,15 +537,6 @@ class RimeHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Rime TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Rime's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> str | None:
"""Convert pipecat language to Rime language code.
@@ -560,7 +587,6 @@ class RimeHttpTTSService(TTSService):
) as response:
if response.status != 200:
error_message = f"Rime TTS error: HTTP {response.status}"
logger.error(error_message)
yield ErrorFrame(error=error_message)
return
@@ -578,8 +604,7 @@ class RimeHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -655,12 +655,10 @@ class RivaSegmentedSTTService(SegmentedSTTService):
logger.debug("No transcription results found in Riva response")
except AttributeError as ae:
logger.error(f"Unexpected response structure from Riva: {ae}")
yield ErrorFrame(f"Unexpected Riva response format: {str(ae)}")
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
class ParakeetSTTService(RivaSTTService):

View File

@@ -113,15 +113,6 @@ class RivaTTSService(TTSService):
riva.client.proto.riva_tts_pb2.RivaSynthesisConfigRequest()
)
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Riva TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Riva's text frames include necessary inter-frame spaces.
"""
return True
async def set_model(self, model: str):
"""Attempt to set the TTS model.
@@ -166,7 +157,6 @@ class RivaTTSService(TTSService):
add_response(None)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
add_response(None)
await self.start_ttfb_metrics()
@@ -190,7 +180,7 @@ class RivaTTSService(TTSService):
yield frame
resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
except asyncio.TimeoutError:
logger.error(f"{self} timeout waiting for audio response")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
await self.start_tts_usage_metrics(text)
yield TTSStoppedFrame()

View File

@@ -14,9 +14,7 @@ from openai import AsyncStream
from openai.types.chat import ChatCompletionChunk
from pipecat.adapters.services.open_ai_adapter import OpenAILLMInvocationParams
from pipecat.frames.frames import (
LLMTextFrame,
)
from pipecat.frames.frames import LLMTextFrame
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
@@ -176,16 +174,20 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
frame = LLMTextFrame(chunk.choices[0].delta.content)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self.push_frame(
LLMTextFrame(chunk.choices[0].delta.content, skip_tts=self._get_skip_tts())
)
# When gpt-4o-audio / gpt-4o-mini-audio is used for llm or stt+llm
# we need to get LLMTextFrame for the transcript
elif hasattr(chunk.choices[0].delta, "audio") and chunk.choices[0].delta.audio.get(
"transcript"
):
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.audio["transcript"]))
await self.push_frame(
LLMTextFrame(
chunk.choices[0].delta.audio["transcript"], skip_tts=self._get_skip_tts()
)
)
# if we got a function name and arguments, check to see if it's a function with
# a registered handler. If so, run the registered callback, save the result to

View File

@@ -275,8 +275,7 @@ class SarvamSTTService(STTService):
await self._socket_client.translate(**method_kwargs)
except Exception as e:
logger.error(f"Error sending audio to Sarvam: {e}")
await self.push_error(ErrorFrame(f"Failed to send audio: {e}"))
yield ErrorFrame(error=f"Error sending audio to Sarvam: {e}", exception=e)
yield None
@@ -332,13 +331,11 @@ class SarvamSTTService(STTService):
logger.info("Connected to Sarvam successfully")
except ApiError as e:
logger.error(f"Sarvam API error: {e}")
await self.push_error(ErrorFrame(f"Sarvam API error: {e}"))
await self.push_error(error_msg=f"Sarvam API error: {e}", exception=e)
except Exception as e:
logger.error(f"Failed to connect to Sarvam: {e}")
self._socket_client = None
self._websocket_context = None
await self.push_error(ErrorFrame(f"Failed to connect to Sarvam: {e}"))
await self.push_error(error_msg=f"Failed to connect to Sarvam: {e}", exception=e)
async def _disconnect(self):
"""Disconnect from Sarvam WebSocket API using SDK."""
@@ -351,7 +348,9 @@ class SarvamSTTService(STTService):
# Exit the async context manager
await self._websocket_context.__aexit__(None, None, None)
except Exception as e:
logger.error(f"Error closing WebSocket connection: {e}")
await self.push_error(
error_msg=f"Error closing WebSocket connection: {e}", exception=e
)
finally:
logger.debug("Disconnected from Sarvam WebSocket")
self._socket_client = None
@@ -371,8 +370,7 @@ class SarvamSTTService(STTService):
# Messages will be handled via the _message_handler callback
await self._socket_client.start_listening()
except Exception as e:
logger.error(f"Error in Sarvam receive task: {e}")
await self.push_error(ErrorFrame(f"Sarvam receive task error: {e}"))
await self.push_error(error_msg=f"Sarvam receive task error: {e}", exception=e)
async def _handle_message(self, message):
"""Handle incoming WebSocket message from Sarvam SDK.
@@ -427,8 +425,7 @@ class SarvamSTTService(STTService):
await self.stop_processing_metrics()
except Exception as e:
logger.error(f"Error handling Sarvam message: {e}")
await self.push_error(ErrorFrame(f"Failed to handle message: {e}"))
await self.push_error(error_msg=f"Failed to handle message: {e}", exception=e)
await self.stop_all_metrics()
@traced_stt

View File

@@ -195,15 +195,6 @@ class SarvamHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Sarvam TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Sarvam's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Sarvam AI language format.
@@ -263,8 +254,7 @@ class SarvamHttpTTSService(TTSService):
async with self._session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Sarvam API error: {error_text}")
await self.push_error(ErrorFrame(error=f"Sarvam API error: {error_text}"))
yield ErrorFrame(error=f"Sarvam API error: {error_text}")
return
response_data = await response.json()
@@ -273,8 +263,7 @@ class SarvamHttpTTSService(TTSService):
# Decode base64 audio data
if "audios" not in response_data or not response_data["audios"]:
logger.error("No audio data received from Sarvam API")
await self.push_error(ErrorFrame(error="No audio data received"))
yield ErrorFrame(error="No audio data received")
return
# Get the first audio (there should be only one for single text input)
@@ -295,8 +284,7 @@ class SarvamHttpTTSService(TTSService):
yield frame
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
yield ErrorFrame(error=f"Error generating TTS: {e}", exception=e)
finally:
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()
@@ -467,15 +455,6 @@ class SarvamTTSService(InterruptibleTTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Sarvam TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Sarvam's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Sarvam AI language format.
@@ -578,8 +557,7 @@ class SarvamTTSService(InterruptibleTTSService):
await self._disconnect_websocket()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
# Reset state only after everything is cleaned up
self._started = False
@@ -603,8 +581,9 @@ class SarvamTTSService(InterruptibleTTSService):
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(
error_msg=f"Error connecting to Sarvam TTS Websocket: {e}", exception=e
)
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
@@ -620,8 +599,7 @@ class SarvamTTSService(InterruptibleTTSService):
await self._websocket.send(json.dumps(config_message))
logger.debug("Configuration sent successfully")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
raise
async def _disconnect_websocket(self):
@@ -633,8 +611,7 @@ class SarvamTTSService(InterruptibleTTSService):
logger.debug("Disconnecting from Sarvam")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
finally:
self._started = False
self._websocket = None
@@ -658,7 +635,7 @@ class SarvamTTSService(InterruptibleTTSService):
await self.push_frame(frame)
elif msg.get("type") == "error":
error_msg = msg["data"]["message"]
logger.error(f"TTS Error: {error_msg}")
await self.push_error(error_msg=f"TTS Error: {error_msg}")
# If it's a timeout error, the connection might need to be reset
if "too long" in error_msg.lower() or "timeout" in error_msg.lower():
@@ -720,13 +697,11 @@ class SarvamTTSService(InterruptibleTTSService):
await self._send_text(text)
await self.start_tts_usage_metrics(text)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
yield TTSStoppedFrame()
await self._disconnect()
await self._connect()
return
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")

View File

@@ -48,12 +48,14 @@ class SimliVideoService(FrameProcessor):
"""Input parameters for Simli video configuration.
Parameters:
enable_logging: Whether to enable Simli logging.
max_session_length: Absolute maximum session duration in seconds.
Avatar will disconnect after this time even if it's speaking.
max_idle_time: Maximum duration in seconds the avatar is not speaking
before the avatar disconnects.
"""
enable_logging: Optional[bool] = None
max_session_length: Optional[int] = None
max_idle_time: Optional[int] = None
@@ -84,6 +86,10 @@ class SimliVideoService(FrameProcessor):
Please use 'api_key' and 'face_id' parameters instead.
use_turn_server: Whether to use TURN server for connection. Defaults to False.
.. deprecated:: 0.0.95
The 'use_turn_server' parameter is deprecated and will be removed in a future version.
latency_interval: Latency interval setting for sending health checks to check
the latency to Simli Servers. Defaults to 0.
simli_url: URL of the simli servers. Can be changed for custom deployments
@@ -135,15 +141,22 @@ class SimliVideoService(FrameProcessor):
config = SimliConfig(**config_kwargs)
if use_turn_server:
warnings.warn(
"The 'use_turn_server' parameter is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
self._initialized = False
# Add buffer time to session limits
config.maxIdleTime += 5
config.maxSessionLength += 5
self._simli_client = SimliClient(
config,
use_turn_server,
latency_interval,
config=config,
latencyInterval=latency_interval,
simliURL=simli_url,
enable_logging=params.enable_logging or False,
)
self._pipecat_resampler: AudioResampler = None
@@ -168,7 +181,7 @@ class SimliVideoService(FrameProcessor):
self._audio_task = self.create_task(self._consume_and_process_audio())
self._video_task = self.create_task(self._consume_and_process_video())
except Exception as e:
logger.error(f"{self}: unable to start connection: {e}")
await self.push_error(error_msg=f"Unable to start connection: {e}", exception=e)
async def _consume_and_process_audio(self):
"""Consume audio frames from Simli and push them downstream."""
@@ -246,7 +259,7 @@ class SimliVideoService(FrameProcessor):
await self._simli_client.send(audioBytes)
return
except Exception as e:
logger.exception(f"{self} exception: {e}")
await self.push_error(error_msg=f"Error sending audio: {e}", exception=e)
elif isinstance(frame, TTSStoppedFrame):
try:
if self._previously_interrupted and len(self._audio_buffer) > 0:
@@ -254,7 +267,7 @@ class SimliVideoService(FrameProcessor):
self._previously_interrupted = False
self._audio_buffer = bytearray()
except Exception as e:
logger.exception(f"{self} exception: {e}")
await self.push_error(error_msg=f"Error stopping TTS: {e}", exception=e)
return
elif isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()

View File

@@ -194,7 +194,7 @@ class SonioxSTTService(STTService):
self._websocket = await websocket_connect(self._url)
if not self._websocket:
logger.error(f"Unable to connect to Soniox API at {self._url}")
await self.push_error(error_msg=f"Unable to connect to Soniox API at {self._url}")
# If vad_force_turn_endpoint is not enabled, we need to enable endpoint detection.
# Either one or the other is required.
@@ -327,8 +327,7 @@ class SonioxSTTService(STTService):
# Expected when closing the connection
logger.debug("WebSocket connection closed, keepalive task stopped.")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
async def _receive_task_handler(self):
if not self._websocket:
@@ -404,13 +403,8 @@ class SonioxSTTService(STTService):
if error_code or error_message:
# In case of error, still send the final transcript (if any remaining in the buffer).
await send_endpoint_transcript()
logger.error(
f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
await self.push_error(
ErrorFrame(
error=f"{self} error: {error_code} (_receive_task_handler) - {error_message}"
)
error_msg=f"Error: {error_code} (_receive_task_handler) - {error_message}"
)
finished = content.get("finished")
@@ -425,5 +419,4 @@ class SonioxSTTService(STTService):
# Expected when closing the connection.
pass
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error receiving message: {e}", exception=e)

View File

@@ -467,8 +467,7 @@ class SpeechmaticsSTTService(STTService):
await self._client.send_audio(audio)
yield None
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
yield ErrorFrame(error=f"Unknown error occurred: {e}")
await self._disconnect()
def update_params(
@@ -514,8 +513,7 @@ class SpeechmaticsSTTService(STTService):
self._client.send_message(payload), self.get_event_loop()
)
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
raise RuntimeError(f"error sending message to STT: {e}")
async def _connect(self) -> None:
@@ -581,8 +579,7 @@ class SpeechmaticsSTTService(STTService):
logger.debug(f"{self} Connected to Speechmatics STT service")
await self._call_event_handler("on_connected")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(error_msg=f"Error connecting to Speechmatics: {e}", exception=e)
self._client = None
async def _disconnect(self) -> None:
@@ -596,8 +593,9 @@ class SpeechmaticsSTTService(STTService):
except asyncio.TimeoutError:
logger.warning(f"{self} Timeout while closing Speechmatics client connection")
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
await self.push_error(
error_msg=f"Error disconnecting from Speechmatics: {e}", exception=e
)
finally:
self._client = None
await self._call_event_handler("on_disconnected")

Some files were not shown because too many files have changed in this diff Show More