Compare commits
27 Commits
v0.0.78
...
cb/elevenl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
40605aef38 | ||
|
|
c246b54d4d | ||
|
|
0502ee2b5a | ||
|
|
9ec047094b | ||
|
|
d991c106c8 | ||
|
|
312fb23c89 | ||
|
|
4d7f21d44e | ||
|
|
ec25d0a7c9 | ||
|
|
2b8218deaa | ||
|
|
11119430cd | ||
|
|
9ca79232c1 | ||
|
|
9ea06c33f7 | ||
|
|
30a1dd202e | ||
|
|
809ab0b7b6 | ||
|
|
2b5db9c562 | ||
|
|
b4a886b59f | ||
|
|
07eb00722b | ||
|
|
96652b8fba | ||
|
|
df1fcf0c68 | ||
|
|
711f740d9e | ||
|
|
a0bda98c20 | ||
|
|
1c1bae35ab | ||
|
|
56c52c2cf2 | ||
|
|
740aee1a1a | ||
|
|
f0391c3280 | ||
|
|
64e48e4660 | ||
|
|
b8147bdbbd |
26
CHANGELOG.md
26
CHANGELOG.md
@@ -5,6 +5,29 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.0.79] - 2025-08-07
|
||||
|
||||
### Changed
|
||||
|
||||
- Changed `pipecat-ai`'s `openai` dependency to `>=1.74.0,<=1.99.1` due to a
|
||||
breaking change in `openai` 1.99.2 ([commit](https://github.com/openai/openai-python/commit/657f551dbe583ffb259d987dafae12c6211fba06))
|
||||
|
||||
### Deprecated
|
||||
|
||||
- `TTSService.say()` is deprecated, push a `TTSSpeakFrame` instead. Calling
|
||||
functions directly is a discouraged pattern in Pipecat because, for example,
|
||||
it might cause issues with frame ordering.
|
||||
|
||||
- `LLMMessagesFrame` is deprecated, in favor of either:
|
||||
|
||||
- `LLMMessagesUpdateFrame` with `run_llm=True`
|
||||
- `OpenAILLMContextFrame` with desired messages in a new context
|
||||
|
||||
- `LLMUserResponseAggregator` and `LLMAssistantResponseAggregator` are
|
||||
deprecated, as they depended on the now-deprecated `LLMMessagesFrame`. Use
|
||||
`LLMUserContextAggregator` and `LLMAssistantResponseAggregator` (or
|
||||
LLM-specific subclasses thereof) instead.
|
||||
|
||||
## [0.0.78] - 2025-08-07
|
||||
|
||||
### Added
|
||||
@@ -53,7 +76,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Added Chinese, Japanese, Korean word timestamp support to
|
||||
`CartesiaTTSService`.
|
||||
|
||||
- Added `region` parameter to `GladiaSTTService`. Accepted values: eu-west (default), us-west.
|
||||
- Added `region` parameter to `GladiaSTTService`. Accepted values: eu-west
|
||||
(default), us-west.
|
||||
|
||||
### Changed
|
||||
|
||||
|
||||
@@ -29,6 +29,9 @@ CARTESIA_API_KEY=...
|
||||
DAILY_API_KEY=...
|
||||
DAILY_SAMPLE_ROOM_URL=https://...
|
||||
|
||||
# Deepgram
|
||||
DEEPGRAM_API_KEY=...
|
||||
|
||||
# ElevenLabs
|
||||
ELEVENLABS_API_KEY=...
|
||||
ELEVENLABS_VOICE_ID=...
|
||||
|
||||
@@ -9,10 +9,14 @@ import os
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
|
||||
from pipecat.frames.frames import EndFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
@@ -59,7 +63,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
# Register an event handler so we can play the audio when the client joins
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
await task.queue_frames([LLMMessagesFrame(messages), EndFrame()])
|
||||
await task.queue_frames([OpenAILLMContextFrame(OpenAILLMContext(messages)), EndFrame()])
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
|
||||
@@ -15,13 +15,16 @@ from pipecat.frames.frames import (
|
||||
DataFrame,
|
||||
Frame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
TextFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.sentence import SentenceAggregator
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
@@ -153,7 +156,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
}
|
||||
]
|
||||
frames.append(MonthFrame(month=month))
|
||||
frames.append(LLMMessagesFrame(messages))
|
||||
frames.append(OpenAILLMContextFrame(OpenAILLMContext(messages)))
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
|
||||
@@ -15,7 +15,6 @@ from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMMessagesFrame,
|
||||
OutputAudioRawFrame,
|
||||
TextFrame,
|
||||
TTSAudioRawFrame,
|
||||
@@ -25,6 +24,10 @@ from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.sentence import SentenceAggregator
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.cartesia.tts import CartesiaHttpTTSService
|
||||
@@ -137,7 +140,7 @@ async def main():
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
await task.queue_frame(LLMMessagesFrame(messages))
|
||||
await task.queue_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
|
||||
await task.stop_when_done()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
@@ -16,13 +16,16 @@ from langchain_openai import ChatOpenAI
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.frames.frames import LLMMessagesUpdateFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantResponseAggregator,
|
||||
LLMUserResponseAggregator,
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
)
|
||||
from pipecat.processors.frameworks.langchain import LangchainProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
@@ -97,8 +100,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
)
|
||||
lc = LangchainProcessor(history_chain)
|
||||
|
||||
tma_in = LLMUserResponseAggregator()
|
||||
tma_out = LLMAssistantResponseAggregator()
|
||||
context = OpenAILLMContext()
|
||||
tma_in = LLMUserContextAggregator(context=context)
|
||||
tma_out = LLMAssistantContextAggregator(context=context)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
@@ -125,11 +129,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using
|
||||
# An `OpenAILLMContextFrame` will be picked up by the LangchainProcessor using
|
||||
# only the content of the last message to inject it in the prompt defined
|
||||
# above. So no role is required here.
|
||||
messages = [({"content": "Please briefly introduce yourself to the user."})]
|
||||
await task.queue_frames([LLMMessagesFrame(messages)])
|
||||
await task.queue_frames([LLMMessagesUpdateFrame(messages, run_llm=True)])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
|
||||
@@ -6,9 +6,13 @@ from typing import Tuple
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, LLMMessagesFrame, TextFrame
|
||||
from pipecat.frames.frames import AudioFrame, EndFrame, ImageFrame, TextFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.aggregators import SentenceAggregator
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.runner.daily import configure
|
||||
from pipecat.services.azure import AzureLLMService, AzureTTSService
|
||||
from pipecat.services.elevenlabs import ElevenLabsTTSService
|
||||
@@ -79,7 +83,7 @@ async def main():
|
||||
sentence_aggregator = SentenceAggregator()
|
||||
pipeline = Pipeline([llm, sentence_aggregator, tts1], source_queue, sink_queue)
|
||||
|
||||
await source_queue.put(LLMMessagesFrame(messages))
|
||||
await source_queue.put(OpenAILLMContextFrame(OpenAILLMContext(messages)))
|
||||
await source_queue.put(EndFrame())
|
||||
await pipeline.run_pipeline()
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
|
||||
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
@@ -119,7 +119,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
image_requester.set_participant_id(client_id)
|
||||
|
||||
# Welcome message
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
|
||||
@@ -11,7 +11,7 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
|
||||
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -123,7 +123,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
image_requester.set_participant_id(client_id)
|
||||
|
||||
# Welcome message
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
|
||||
@@ -11,7 +11,7 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
|
||||
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -123,7 +123,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
image_requester.set_participant_id(client_id)
|
||||
|
||||
# Welcome message
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
|
||||
@@ -11,7 +11,7 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame, TextFrame, UserImageRequestFrame
|
||||
from pipecat.frames.frames import Frame, TextFrame, TTSSpeakFrame, UserImageRequestFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -123,7 +123,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
image_requester.set_participant_id(client_id)
|
||||
|
||||
# Welcome message
|
||||
await tts.say("Hi there! Feel free to ask me what I see.")
|
||||
await task.queue_frame(TTSSpeakFrame("Hi there! Feel free to ask me what I see."))
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
|
||||
@@ -11,7 +11,7 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
|
||||
from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -75,23 +75,19 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
|
||||
if retry_count == 1:
|
||||
# First attempt: Add a gentle prompt to the conversation
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
|
||||
}
|
||||
)
|
||||
await user_idle.push_frame(LLMMessagesFrame(messages))
|
||||
message = {
|
||||
"role": "system",
|
||||
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
|
||||
}
|
||||
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
|
||||
return True
|
||||
elif retry_count == 2:
|
||||
# Second attempt: More direct prompt
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
|
||||
}
|
||||
)
|
||||
await user_idle.push_frame(LLMMessagesFrame(messages))
|
||||
message = {
|
||||
"role": "system",
|
||||
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
|
||||
}
|
||||
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
|
||||
return True
|
||||
else:
|
||||
# Third attempt: End the conversation
|
||||
|
||||
@@ -14,6 +14,7 @@ from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -34,7 +35,6 @@ load_dotenv(override=True)
|
||||
|
||||
|
||||
BASE_FILENAME = "/tmp/pipecat_conversation_"
|
||||
tts = None
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
@@ -87,7 +87,7 @@ async def load_conversation(params: FunctionCallParams):
|
||||
logger.debug(
|
||||
f"loaded conversation from {filename}\n{json.dumps(params.context.messages, indent=4)}"
|
||||
)
|
||||
await tts.say("Ok, I've loaded that conversation.")
|
||||
await params.llm.queue_frame(TTSSpeakFrame("Ok, I've loaded that conversation."))
|
||||
except Exception as e:
|
||||
await params.result_callback({"success": False, "error": str(e)})
|
||||
|
||||
@@ -190,7 +190,7 @@ transport_params = {
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
global tts
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
|
||||
@@ -14,6 +14,7 @@ from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -88,7 +89,7 @@ async def load_conversation(params: FunctionCallParams):
|
||||
logger.debug(
|
||||
f"loaded conversation from {filename}\n{json.dumps(params.context.messages, indent=4)}"
|
||||
)
|
||||
await tts.say("Ok, I've loaded that conversation.")
|
||||
await params.llm.queue_frame(TTSSpeakFrame("Ok, I've loaded that conversation."))
|
||||
except Exception as e:
|
||||
await params.result_callback({"success": False, "error": str(e)})
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
LLMMessagesFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
@@ -60,10 +59,6 @@ classifier_statement = "Determine if the user's statement ends with a complete t
|
||||
|
||||
|
||||
class StatementJudgeContextFilter(FrameProcessor):
|
||||
def __init__(self, notifier: BaseNotifier, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._notifier = notifier
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
# We must not block system frames.
|
||||
@@ -71,13 +66,8 @@ class StatementJudgeContextFilter(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# Just treat an LLMMessagesFrame as complete, no matter what.
|
||||
if isinstance(frame, LLMMessagesFrame):
|
||||
await self._notifier.notify()
|
||||
return
|
||||
|
||||
# Otherwise, we only want to handle OpenAILLMContextFrames, and only want to push a simple
|
||||
# messages frame that contains a system prompt and the most recent user messages,
|
||||
# We only want to handle OpenAILLMContextFrames, and only want to push through a simplified
|
||||
# context frame that contains a system prompt and the most recent user messages,
|
||||
# concatenated.
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
logger.debug(f"Context Frame: {frame}")
|
||||
@@ -96,7 +86,7 @@ class StatementJudgeContextFilter(FrameProcessor):
|
||||
for content in message["content"]:
|
||||
if content["type"] == "text":
|
||||
user_text_messages.insert(0, content["text"])
|
||||
# If we have any user text content, push an LLMMessagesFrame
|
||||
# If we have any user text content, push a context frame with the simplified context.
|
||||
if user_text_messages:
|
||||
logger.debug(f"User text messages: {user_text_messages}")
|
||||
user_message = " ".join(reversed(user_text_messages))
|
||||
@@ -110,7 +100,7 @@ class StatementJudgeContextFilter(FrameProcessor):
|
||||
if last_assistant_message:
|
||||
messages.append(last_assistant_message)
|
||||
messages.append({"role": "user", "content": user_message})
|
||||
await self.push_frame(LLMMessagesFrame(messages))
|
||||
await self.push_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
|
||||
|
||||
|
||||
class CompletenessCheck(FrameProcessor):
|
||||
@@ -296,7 +286,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
# This turns the LLM context into an inference request to classify the user's speech
|
||||
# as complete or incomplete.
|
||||
statement_judge_context_filter = StatementJudgeContextFilter(notifier=notifier)
|
||||
statement_judge_context_filter = StatementJudgeContextFilter()
|
||||
|
||||
# This sends a UserStoppedSpeakingFrame and triggers the notifier event
|
||||
completeness_check = CompletenessCheck(notifier=notifier)
|
||||
@@ -316,7 +306,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def pass_only_llm_trigger_frames(frame):
|
||||
return (
|
||||
isinstance(frame, OpenAILLMContextFrame)
|
||||
or isinstance(frame, LLMMessagesFrame)
|
||||
or isinstance(frame, StartInterruptionFrame)
|
||||
or isinstance(frame, StopInterruptionFrame)
|
||||
or isinstance(frame, FunctionCallInProgressFrame)
|
||||
@@ -331,14 +320,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
ParallelPipeline(
|
||||
[
|
||||
# Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed
|
||||
# LLMMessagesFrame to the statement classifier LLM. The only frame this
|
||||
# simplified context frame to the statement classifier LLM. The only frame this
|
||||
# sub-pipeline will output is a UserStoppedSpeakingFrame.
|
||||
statement_judge_context_filter,
|
||||
statement_llm,
|
||||
completeness_check,
|
||||
],
|
||||
[
|
||||
# Block everything except OpenAILLMContextFrame and LLMMessagesFrame
|
||||
# Block everything except frames that trigger LLM inference.
|
||||
FunctionFilter(filter=pass_only_llm_trigger_frames),
|
||||
llm,
|
||||
bot_output_gate, # Buffer all llm/tts output until notified.
|
||||
|
||||
@@ -19,7 +19,6 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
LLMMessagesFrame,
|
||||
StartFrame,
|
||||
StartInterruptionFrame,
|
||||
StopInterruptionFrame,
|
||||
@@ -266,10 +265,6 @@ Please be very concise in your responses. Unless you are explicitly asked to do
|
||||
|
||||
|
||||
class StatementJudgeContextFilter(FrameProcessor):
|
||||
def __init__(self, notifier: BaseNotifier, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._notifier = notifier
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
# We must not block system frames.
|
||||
@@ -277,14 +272,8 @@ class StatementJudgeContextFilter(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
return
|
||||
|
||||
# Just treat an LLMMessagesFrame as complete, no matter what.
|
||||
if isinstance(frame, LLMMessagesFrame):
|
||||
await self._notifier.notify()
|
||||
return
|
||||
|
||||
# Otherwise, we only want to handle OpenAILLMContextFrames, and only want to push a simple
|
||||
# messages frame that contains a system prompt and the most recent user messages,
|
||||
# concatenated.
|
||||
# We only want to handle OpenAILLMContextFrames, and only want to push through a simplified
|
||||
# context frame that contains a system prompt and the most recent user messages,
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
# Take text content from the most recent user messages.
|
||||
messages = frame.context.messages
|
||||
@@ -301,7 +290,7 @@ class StatementJudgeContextFilter(FrameProcessor):
|
||||
for content in message["content"]:
|
||||
if content["type"] == "text":
|
||||
user_text_messages.insert(0, content["text"])
|
||||
# If we have any user text content, push an LLMMessagesFrame
|
||||
# If we have any user text content, push a context frame with the simplified context.
|
||||
if user_text_messages:
|
||||
user_message = " ".join(reversed(user_text_messages))
|
||||
logger.debug(f"!!! {user_message}")
|
||||
@@ -314,7 +303,7 @@ class StatementJudgeContextFilter(FrameProcessor):
|
||||
if last_assistant_message:
|
||||
messages.append(last_assistant_message)
|
||||
messages.append({"role": "user", "content": user_message})
|
||||
await self.push_frame(LLMMessagesFrame(messages))
|
||||
await self.push_frame(OpenAILLMContextFrame(OpenAILLMContext(messages)))
|
||||
|
||||
|
||||
class CompletenessCheck(FrameProcessor):
|
||||
@@ -499,7 +488,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
# This turns the LLM context into an inference request to classify the user's speech
|
||||
# as complete or incomplete.
|
||||
statement_judge_context_filter = StatementJudgeContextFilter(notifier=notifier)
|
||||
statement_judge_context_filter = StatementJudgeContextFilter()
|
||||
|
||||
# This sends a UserStoppedSpeakingFrame and triggers the notifier event
|
||||
completeness_check = CompletenessCheck(notifier=notifier)
|
||||
@@ -522,7 +511,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
async def pass_only_llm_trigger_frames(frame):
|
||||
return (
|
||||
isinstance(frame, OpenAILLMContextFrame)
|
||||
or isinstance(frame, LLMMessagesFrame)
|
||||
or isinstance(frame, StartInterruptionFrame)
|
||||
or isinstance(frame, StopInterruptionFrame)
|
||||
or isinstance(frame, FunctionCallInProgressFrame)
|
||||
@@ -542,14 +530,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
],
|
||||
[
|
||||
# Ignore everything except an OpenAILLMContextFrame. Pass a specially constructed
|
||||
# LLMMessagesFrame to the statement classifier LLM. The only frame this
|
||||
# simplified context frame to the statement classifier LLM. The only frame this
|
||||
# sub-pipeline will output is a UserStoppedSpeakingFrame.
|
||||
statement_judge_context_filter,
|
||||
statement_llm,
|
||||
completeness_check,
|
||||
],
|
||||
[
|
||||
# Block everything except OpenAILLMContextFrame and LLMMessagesFrame
|
||||
# Block everything except frames that trigger LLM inference.
|
||||
FunctionFilter(filter=pass_only_llm_trigger_frames),
|
||||
llm,
|
||||
bot_output_gate, # Buffer all llm/tts output until notified.
|
||||
|
||||
@@ -33,7 +33,7 @@ dependencies = [
|
||||
"pyloudnorm~=0.1.1",
|
||||
"resampy~=0.4.3",
|
||||
"soxr~=0.5.0",
|
||||
"openai>=1.74.0,<2",
|
||||
"openai>=1.74.0,<=1.99.1",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
@@ -478,6 +478,12 @@ class TranscriptionUpdateFrame(DataFrame):
|
||||
class LLMMessagesFrame(DataFrame):
|
||||
"""Frame containing LLM messages for chat completion.
|
||||
|
||||
.. deprecated:: 0.0.79
|
||||
This class is deprecated and will be removed in a future version.
|
||||
Instead, use either:
|
||||
- `LLMMessagesUpdateFrame` with `run_llm=True`
|
||||
- `OpenAILLMContextFrame` with desired messages in a new context
|
||||
|
||||
A frame containing a list of LLM messages. Used to signal that an LLM
|
||||
service should run a chat completion and emit an LLMFullResponseStartFrame,
|
||||
TextFrames and an LLMFullResponseEndFrame. Note that the `messages`
|
||||
@@ -490,6 +496,20 @@ class LLMMessagesFrame(DataFrame):
|
||||
|
||||
messages: List[dict]
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
import warnings
|
||||
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"LLMMessagesFrame is deprecated and will be removed in a future version. "
|
||||
"Instead, use either "
|
||||
"`LLMMessagesUpdateFrame` with `run_llm=True`, or "
|
||||
"`OpenAILLMContextFrame` with desired messages in a new context",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMMessagesAppendFrame(DataFrame):
|
||||
|
||||
@@ -12,6 +12,7 @@ LLM processing, and text-to-speech components in conversational AI pipelines.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import warnings
|
||||
from abc import abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Literal, Optional, Set
|
||||
@@ -994,6 +995,10 @@ class LLMAssistantContextAggregator(LLMContextResponseAggregator):
|
||||
class LLMUserResponseAggregator(LLMUserContextAggregator):
|
||||
"""User response aggregator that outputs LLMMessagesFrame instead of context frames.
|
||||
|
||||
.. deprecated:: 0.0.79
|
||||
This class is deprecated and will be removed in a future version.
|
||||
Use `LLMUserContextAggregator` or another LLM-specific subclass instead.
|
||||
|
||||
This aggregator extends LLMUserContextAggregator but pushes LLMMessagesFrame
|
||||
objects downstream instead of OpenAILLMContextFrame objects. This is useful
|
||||
when you need message-based output rather than context-based output.
|
||||
@@ -1013,6 +1018,12 @@ class LLMUserResponseAggregator(LLMUserContextAggregator):
|
||||
params: Configuration parameters for aggregation behavior.
|
||||
**kwargs: Additional arguments passed to parent class.
|
||||
"""
|
||||
warnings.warn(
|
||||
"LLMUserResponseAggregator is deprecated and will be removed in a future version. "
|
||||
"Use LLMUserContextAggregator or another LLM-specific subclass instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
super().__init__(context=OpenAILLMContext(messages), params=params, **kwargs)
|
||||
|
||||
async def _process_aggregation(self):
|
||||
@@ -1027,6 +1038,10 @@ class LLMUserResponseAggregator(LLMUserContextAggregator):
|
||||
class LLMAssistantResponseAggregator(LLMAssistantContextAggregator):
|
||||
"""Assistant response aggregator that outputs LLMMessagesFrame instead of context frames.
|
||||
|
||||
.. deprecated:: 0.0.79
|
||||
This class is deprecated and will be removed in a future version.
|
||||
Use `LLMAssistantContextAggregator` or another LLM-specific subclass instead.
|
||||
|
||||
This aggregator extends LLMAssistantContextAggregator but pushes LLMMessagesFrame
|
||||
objects downstream instead of OpenAILLMContextFrame objects. This is useful
|
||||
when you need message-based output rather than context-based output.
|
||||
@@ -1046,6 +1061,12 @@ class LLMAssistantResponseAggregator(LLMAssistantContextAggregator):
|
||||
params: Configuration parameters for aggregation behavior.
|
||||
**kwargs: Additional arguments passed to parent class.
|
||||
"""
|
||||
warnings.warn(
|
||||
"LLMAssistantResponseAggregator is deprecated and will be removed in a future version. "
|
||||
"Use LLMAssistantContextAggregator or another LLM-specific subclass instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
super().__init__(context=OpenAILLMContext(messages), params=params, **kwargs)
|
||||
|
||||
async def push_aggregation(self):
|
||||
|
||||
@@ -12,13 +12,14 @@ in conversational pipelines.
|
||||
"""
|
||||
|
||||
from pipecat.frames.frames import TextFrame
|
||||
from pipecat.processors.aggregators.llm_response import LLMUserResponseAggregator
|
||||
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
|
||||
|
||||
class UserResponseAggregator(LLMUserResponseAggregator):
|
||||
class UserResponseAggregator(LLMUserContextAggregator):
|
||||
"""Aggregates user responses into TextFrame objects.
|
||||
|
||||
This aggregator extends LLMUserResponseAggregator to specifically handle
|
||||
This aggregator extends LLMUserContextAggregator to specifically handle
|
||||
user input by collecting text responses and outputting them as TextFrame
|
||||
objects when the aggregation is complete.
|
||||
"""
|
||||
@@ -27,9 +28,9 @@ class UserResponseAggregator(LLMUserResponseAggregator):
|
||||
"""Initialize the user response aggregator.
|
||||
|
||||
Args:
|
||||
**kwargs: Additional arguments passed to parent LLMUserResponseAggregator.
|
||||
**kwargs: Additional arguments passed to parent LLMUserContextAggregator.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
super().__init__(context=OpenAILLMContext(), **kwargs)
|
||||
|
||||
async def push_aggregation(self):
|
||||
"""Push the aggregated user response as a TextFrame.
|
||||
|
||||
@@ -14,9 +14,9 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
TextFrame,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
try:
|
||||
@@ -64,11 +64,11 @@ class LangchainProcessor(FrameProcessor):
|
||||
"""
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, LLMMessagesFrame):
|
||||
if isinstance(frame, OpenAILLMContextFrame):
|
||||
# Messages are accumulated on the context as a list of messages.
|
||||
# The last one by the human is the one we want to send to the LLM.
|
||||
logger.debug(f"Got transcription frame {frame}")
|
||||
text: str = frame.messages[-1]["content"]
|
||||
text: str = frame.context.messages[-1]["content"]
|
||||
|
||||
await self._ainvoke(text.strip())
|
||||
else:
|
||||
|
||||
@@ -448,14 +448,16 @@ class AnthropicLLMContext(OpenAILLMContext):
|
||||
system: System message content.
|
||||
"""
|
||||
super().__init__(messages=messages, tools=tools, tool_choice=tool_choice)
|
||||
self.__setup_local()
|
||||
self.system = system
|
||||
|
||||
def __setup_local(self):
|
||||
# For beta prompt caching. This is a counter that tracks the number of turns
|
||||
# we've seen above the cache threshold. We reset this when we reset the
|
||||
# messages list. We only care about this number being 0, 1, or 2. But
|
||||
# it's easiest just to treat it as a counter.
|
||||
self.turns_above_cache_threshold = 0
|
||||
|
||||
self.system = system
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def upgrade_to_anthropic(obj: OpenAILLMContext) -> "AnthropicLLMContext":
|
||||
@@ -472,6 +474,7 @@ class AnthropicLLMContext(OpenAILLMContext):
|
||||
logger.debug(f"Upgrading to Anthropic: {obj}")
|
||||
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AnthropicLLMContext):
|
||||
obj.__class__ = AnthropicLLMContext
|
||||
obj.__setup_local()
|
||||
obj._restructure_from_openai_messages()
|
||||
return obj
|
||||
|
||||
|
||||
@@ -479,6 +479,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
self._websocket = await websocket_connect(
|
||||
url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
|
||||
)
|
||||
logger.debug(f"{self}: WebSocket connected to ElevenLabs - ready to receive audio")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
@@ -513,7 +514,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
|
||||
# Close the current context when interrupted without closing the websocket
|
||||
if self._context_id and self._websocket:
|
||||
logger.trace(f"Closing context {self._context_id} due to interruption")
|
||||
logger.debug(
|
||||
f"{self}: Closing context {self._context_id} due to interruption - this will stop audio stream"
|
||||
)
|
||||
try:
|
||||
# ElevenLabs requires that Pipecat manages the contexts and closes them
|
||||
# when they're not longer in use. Since a StartInterruptionFrame is pushed
|
||||
@@ -524,6 +527,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
await self._websocket.send(
|
||||
json.dumps({"context_id": self._context_id, "close_context": True})
|
||||
)
|
||||
logger.debug(f"{self}: Sent close_context message for {self._context_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing context on interruption: {e}")
|
||||
self._context_id = None
|
||||
@@ -534,15 +538,28 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
async for message in WatchdogAsyncIterator(
|
||||
self._get_websocket(), manager=self.task_manager
|
||||
):
|
||||
# Log raw message structure for debugging (truncated for readability)
|
||||
message_preview = message[:500] + "..." if len(message) > 500 else message
|
||||
logger.debug(f"{self}: Raw WebSocket message preview: {message_preview}")
|
||||
|
||||
msg = json.loads(message)
|
||||
|
||||
received_ctx_id = msg.get("contextId")
|
||||
|
||||
# Log the message structure without the large audio data
|
||||
msg_structure = {
|
||||
k: (f"<{len(v)} chars>" if k == "audio" and isinstance(v, str) else v)
|
||||
for k, v in msg.items()
|
||||
}
|
||||
logger.debug(f"{self}: Parsed message structure: {msg_structure}")
|
||||
|
||||
# Handle final messages first, regardless of context availability
|
||||
# At the moment, this message is received AFTER the close_context message is
|
||||
# sent, so it doesn't serve any functional purpose. For now, we'll just log it.
|
||||
if msg.get("isFinal") is True:
|
||||
logger.trace(f"Received final message for context {received_ctx_id}")
|
||||
logger.debug(
|
||||
f"{self}: Received final message for context {received_ctx_id} - audio stream ended"
|
||||
)
|
||||
continue
|
||||
|
||||
# Check if this message belongs to the current context.
|
||||
@@ -564,7 +581,44 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
self.start_word_timestamps()
|
||||
|
||||
audio = base64.b64decode(msg["audio"])
|
||||
|
||||
# Targeted logging for audio debugging
|
||||
audio_size = len(audio)
|
||||
|
||||
# More comprehensive audio analysis
|
||||
if audio_size > 0:
|
||||
# Sample first and last 8 bytes
|
||||
first_8 = audio[:8].hex() if audio_size >= 8 else audio.hex()
|
||||
last_8 = audio[-8:].hex() if audio_size >= 8 else ""
|
||||
|
||||
# Statistical analysis
|
||||
non_zero_count = sum(1 for b in audio if b != 0)
|
||||
non_zero_percent = (non_zero_count / audio_size * 100) if audio_size > 0 else 0
|
||||
|
||||
# Check for common silence patterns
|
||||
all_zeros = all(b == 0 for b in audio)
|
||||
mostly_zeros = non_zero_percent < 1.0 # Less than 1% non-zero
|
||||
|
||||
# Sample some values to check amplitude range
|
||||
max_val = max(audio) if audio else 0
|
||||
min_val = min(audio) if audio else 0
|
||||
|
||||
logger.debug(
|
||||
f"{self}: Audio received - size: {audio_size} bytes, "
|
||||
f"first_8: {first_8}, last_8: {last_8}, "
|
||||
f"non_zero: {non_zero_percent:.1f}%, all_zeros: {all_zeros}, "
|
||||
f"mostly_zeros: {mostly_zeros}, range: {min_val}-{max_val}, "
|
||||
f"context: {received_ctx_id}, cumulative_time: {self._cumulative_time:.3f}s"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"{self}: Received empty audio data for context {received_ctx_id}"
|
||||
)
|
||||
|
||||
frame = TTSAudioRawFrame(audio, self.sample_rate, 1)
|
||||
logger.debug(
|
||||
f"[ELEVENLABS_AUDIO] Creating TTSAudioRawFrame with {len(audio)} bytes for context {received_ctx_id}"
|
||||
)
|
||||
await self.append_to_audio_context(received_ctx_id, frame)
|
||||
|
||||
if msg.get("alignment"):
|
||||
@@ -620,6 +674,9 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
|
||||
"""Send text to the WebSocket for synthesis."""
|
||||
if self._websocket and self._context_id:
|
||||
msg = {"text": text, "context_id": self._context_id}
|
||||
logger.debug(
|
||||
f"{self}: Sending text to ElevenLabs - length: {len(text)}, context: {self._context_id}"
|
||||
)
|
||||
await self._websocket.send(json.dumps(msg))
|
||||
|
||||
@traced_tts
|
||||
@@ -973,6 +1030,9 @@ class ElevenLabsHttpTTSService(WordTTSService):
|
||||
if data and "audio_base64" in data:
|
||||
await self.stop_ttfb_metrics()
|
||||
audio = base64.b64decode(data["audio_base64"])
|
||||
logger.debug(
|
||||
f"[ELEVENLABS_AUDIO] Yielding TTSAudioRawFrame with {len(audio)} bytes"
|
||||
)
|
||||
yield TTSAudioRawFrame(audio, self.sample_rate, 1)
|
||||
|
||||
# Process alignment if present
|
||||
|
||||
@@ -269,9 +269,20 @@ class TTSService(AIService):
|
||||
async def say(self, text: str):
|
||||
"""Immediately speak the provided text.
|
||||
|
||||
.. deprecated:: 0.0.79
|
||||
Push a `TTSSpeakFrame` instead to ensure frame ordering is maintained.
|
||||
|
||||
Args:
|
||||
text: The text to speak.
|
||||
"""
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"`TTSService.say()` is deprecated. Push a `TTSSpeakFrame` instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
await self.queue_frame(TTSSpeakFrame(text))
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
@@ -322,6 +333,9 @@ class TTSService(AIService):
|
||||
elif isinstance(frame, TTSUpdateSettingsFrame):
|
||||
await self._update_settings(frame.settings)
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
logger.warning(
|
||||
f"[TTS_RESUME] {self.__class__.__name__} received BotStoppedSpeakingFrame"
|
||||
)
|
||||
await self._maybe_resume_frame_processing()
|
||||
await self.push_frame(frame, direction)
|
||||
else:
|
||||
@@ -365,11 +379,15 @@ class TTSService(AIService):
|
||||
|
||||
async def _maybe_pause_frame_processing(self):
|
||||
if self._processing_text and self._pause_frame_processing:
|
||||
logger.warning(f"[TTS_PAUSE] {self.__class__.__name__} pausing frame processing")
|
||||
await self.pause_processing_frames()
|
||||
|
||||
async def _maybe_resume_frame_processing(self):
|
||||
if self._pause_frame_processing:
|
||||
logger.warning(f"[TTS_RESUME] {self.__class__.__name__} resuming frame processing")
|
||||
await self.resume_processing_frames()
|
||||
else:
|
||||
logger.debug(f"[TTS_RESUME] {self.__class__.__name__} resume called but not paused")
|
||||
|
||||
async def _process_text_frame(self, frame: TextFrame):
|
||||
text: Optional[str] = None
|
||||
|
||||
@@ -503,6 +503,9 @@ class BaseOutputTransport(FrameProcessor):
|
||||
num_channels=frame.num_channels,
|
||||
)
|
||||
chunk.transport_destination = self._destination
|
||||
logger.debug(
|
||||
f"[AUDIO_QUEUE] Putting {chunk.__class__.__name__} with {len(chunk.audio)} bytes into audio queue"
|
||||
)
|
||||
await self._audio_queue.put(chunk)
|
||||
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
|
||||
|
||||
@@ -582,15 +585,18 @@ class BaseOutputTransport(FrameProcessor):
|
||||
async def _bot_stopped_speaking(self):
|
||||
"""Handle bot stopped speaking event."""
|
||||
if self._bot_speaking:
|
||||
logger.debug(
|
||||
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
|
||||
logger.warning(
|
||||
f"[BOT_STOPPED] Bot stopped speaking - sending BotStoppedSpeakingFrame upstream"
|
||||
)
|
||||
|
||||
downstream_frame = BotStoppedSpeakingFrame()
|
||||
downstream_frame.transport_destination = self._destination
|
||||
upstream_frame = BotStoppedSpeakingFrame()
|
||||
upstream_frame.transport_destination = self._destination
|
||||
|
||||
logger.debug(f"[BOT_STOPPED] Pushing downstream BotStoppedSpeakingFrame")
|
||||
await self._transport.push_frame(downstream_frame)
|
||||
logger.debug(f"[BOT_STOPPED] Pushing upstream BotStoppedSpeakingFrame")
|
||||
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
|
||||
self._bot_speaking = False
|
||||
@@ -598,6 +604,8 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# Clean audio buffer (there could be tiny left overs if not multiple
|
||||
# to our output chunk size).
|
||||
self._audio_buffer = bytearray()
|
||||
else:
|
||||
logger.debug(f"[BOT_STOPPED] _bot_stopped_speaking called but bot was not speaking")
|
||||
|
||||
async def _handle_frame(self, frame: Frame):
|
||||
"""Handle various frame types with appropriate processing.
|
||||
@@ -624,12 +632,24 @@ class BaseOutputTransport(FrameProcessor):
|
||||
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
|
||||
while True:
|
||||
try:
|
||||
logger.debug(
|
||||
f"[AUDIO_QUEUE] Waiting for audio frame (timeout={vad_stop_secs}s)"
|
||||
)
|
||||
start_time = time.time()
|
||||
frame = await asyncio.wait_for(
|
||||
self._audio_queue.get(), timeout=vad_stop_secs
|
||||
)
|
||||
wait_time = time.time() - start_time
|
||||
logger.debug(
|
||||
f"[AUDIO_QUEUE] Got frame {frame.__class__.__name__} after {wait_time:.3f}s"
|
||||
)
|
||||
self._transport.reset_watchdog()
|
||||
yield frame
|
||||
except asyncio.TimeoutError:
|
||||
wait_time = time.time() - start_time
|
||||
logger.warning(
|
||||
f"[AUDIO_QUEUE] TIMEOUT after {wait_time:.3f}s - triggering bot_stopped_speaking"
|
||||
)
|
||||
self._transport.reset_watchdog()
|
||||
# Notify the bot stopped speaking upstream if necessary.
|
||||
await self._bot_stopped_speaking()
|
||||
|
||||
@@ -12,7 +12,7 @@ from langchain_core.language_models import FakeStreamingListLLM
|
||||
from pipecat.frames.frames import (
|
||||
LLMFullResponseEndFrame,
|
||||
LLMFullResponseStartFrame,
|
||||
LLMMessagesFrame,
|
||||
OpenAILLMContextAssistantTimestampFrame,
|
||||
TextFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
@@ -21,8 +21,12 @@ from pipecat.frames.frames import (
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.aggregators.llm_response import (
|
||||
LLMAssistantAggregatorParams,
|
||||
LLMAssistantResponseAggregator,
|
||||
LLMUserResponseAggregator,
|
||||
LLMAssistantContextAggregator,
|
||||
LLMUserContextAggregator,
|
||||
)
|
||||
from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContext,
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.processors.frameworks.langchain import LangchainProcessor
|
||||
@@ -63,9 +67,10 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
proc = LangchainProcessor(chain=chain)
|
||||
self.mock_proc = self.MockProcessor("token_collector")
|
||||
|
||||
tma_in = LLMUserResponseAggregator(messages)
|
||||
tma_out = LLMAssistantResponseAggregator(
|
||||
messages, params=LLMAssistantAggregatorParams(expect_stripped_words=False)
|
||||
context = OpenAILLMContext()
|
||||
tma_in = LLMUserContextAggregator(context)
|
||||
tma_out = LLMAssistantContextAggregator(
|
||||
context, params=LLMAssistantAggregatorParams(expect_stripped_words=False)
|
||||
)
|
||||
|
||||
pipeline = Pipeline([tma_in, proc, self.mock_proc, tma_out])
|
||||
@@ -79,7 +84,8 @@ class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
expected_down_frames = [
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
LLMMessagesFrame,
|
||||
OpenAILLMContextFrame,
|
||||
OpenAILLMContextAssistantTimestampFrame,
|
||||
]
|
||||
await run_test(
|
||||
pipeline,
|
||||
|
||||
2
uv.lock
generated
2
uv.lock
generated
@@ -4364,7 +4364,7 @@ requires-dist = [
|
||||
{ name = "numpy", specifier = ">=1.26.4,<3" },
|
||||
{ name = "nvidia-riva-client", marker = "extra == 'riva'", specifier = "~=2.21.1" },
|
||||
{ name = "onnxruntime", marker = "extra == 'silero'", specifier = "~=1.20.1" },
|
||||
{ name = "openai", specifier = ">=1.74.0,<2" },
|
||||
{ name = "openai", specifier = ">=1.74.0,<=1.99.1" },
|
||||
{ name = "opencv-python", marker = "extra == 'webrtc'", specifier = "~=4.11.0.86" },
|
||||
{ name = "openpipe", marker = "extra == 'openpipe'", specifier = "~=4.50.0" },
|
||||
{ name = "opentelemetry-api", marker = "extra == 'tracing'", specifier = ">=1.33.0" },
|
||||
|
||||
Reference in New Issue
Block a user