Compare commits
72 Commits
hush/callT
...
v0.0.54
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cd5075ed7a | ||
|
|
6f41a667c8 | ||
|
|
0b222a7eae | ||
|
|
f09f4b8fc4 | ||
|
|
cca241a2b7 | ||
|
|
1489e44740 | ||
|
|
f55f78e70e | ||
|
|
10202dc529 | ||
|
|
498805a34c | ||
|
|
509f143e1b | ||
|
|
737e4fa3bd | ||
|
|
8b5228a105 | ||
|
|
6cc01bc5b0 | ||
|
|
2a2928d96c | ||
|
|
a3a6adbd17 | ||
|
|
bf5ced18b2 | ||
|
|
2eccd1b1e9 | ||
|
|
9374bed878 | ||
|
|
c03d0352b1 | ||
|
|
af90b8b4fa | ||
|
|
0a9daa2f56 | ||
|
|
e48c0e52ef | ||
|
|
6bca8396d3 | ||
|
|
c2d8a45a07 | ||
|
|
80a7f1b1e7 | ||
|
|
aff6e24560 | ||
|
|
cb93f6b368 | ||
|
|
ff0bcec33a | ||
|
|
5885fcc230 | ||
|
|
57b186cde8 | ||
|
|
d1a3f404a5 | ||
|
|
179ddbea7d | ||
|
|
86c1e6a3bd | ||
|
|
9e9822f17d | ||
|
|
5f9671e2ca | ||
|
|
aac8961ae5 | ||
|
|
3e6377346a | ||
|
|
9d9a622b1a | ||
|
|
3e9a6b6262 | ||
|
|
fb3097560f | ||
|
|
ff6368add0 | ||
|
|
89fd03d86f | ||
|
|
0672530d6b | ||
|
|
7a0cfc8d3d | ||
|
|
b881dd57b3 | ||
|
|
abf0d0d053 | ||
|
|
1acdf7aff7 | ||
|
|
96b90abda6 | ||
|
|
202a844eeb | ||
|
|
655d56f634 | ||
|
|
07c84b733b | ||
|
|
7c52736ff6 | ||
|
|
48ce751602 | ||
|
|
1f1e2dac2b | ||
|
|
71c2dc3d05 | ||
|
|
ef02ece662 | ||
|
|
d5818fad5b | ||
|
|
dbea86baae | ||
|
|
c5faac1cf8 | ||
|
|
e106d7a215 | ||
|
|
40c1a8369a | ||
|
|
6ab2404a98 | ||
|
|
e61c996a2e | ||
|
|
2c81dc1f06 | ||
|
|
53251dcb88 | ||
|
|
d4e4b12109 | ||
|
|
466d26a4f2 | ||
|
|
ef511d580d | ||
|
|
5957ddb038 | ||
|
|
799c2d14b8 | ||
|
|
dee1224530 | ||
|
|
9b61633aa0 |
37
CHANGELOG.md
37
CHANGELOG.md
@@ -5,15 +5,40 @@ All notable changes to **Pipecat** will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
## [0.0.54] - 2025-01-27
|
||||
|
||||
### Added
|
||||
|
||||
- In order to create tasks in Pipecat frame processors it is now recommended to
|
||||
use `FrameProcessor.create_task()` (which uses the new
|
||||
`utils.asyncio.create_task()`). It takes care of uncaught exceptions, task
|
||||
cancellation handling and task management. To cancel or wait for a task there
|
||||
is `FrameProcessor.cancel_task()` and `FrameProcessor.wait_for_task()`. All of
|
||||
Pipecat processors have been updated accordingly. Also, when a pipeline runner
|
||||
finishes, a warning about dangling tasks might appear, which indicates if any
|
||||
of the created tasks was never cancelled or awaited for (using these new
|
||||
functions).
|
||||
|
||||
- It is now possible to specify the period of the `PipelineTask` heartbeat
|
||||
frames with `heartbeats_period_secs`.
|
||||
|
||||
- Added `DailyMeetingTokenProperties` and `DailyMeetingTokenParams` Pydantic models
|
||||
for meeting token creation in `get_token` method of `DailyRESTHelper`.
|
||||
|
||||
- Added `enable_recording` and `geo` parameters to `DailyRoomProperties`.
|
||||
|
||||
- Added `RecordingsBucketConfig` to `DailyRoomProperties` to upload recordings to a custom AWS bucket.
|
||||
|
||||
### Changed
|
||||
|
||||
- Enhanced `UserIdleProcessor` with retry functionality and control over idle
|
||||
monitoring via new callback signature `(processor, retry_count) -> bool`.
|
||||
Updated the `17-detect-user-idle.py` to show how to use the `retry_count`.
|
||||
|
||||
- Add defensive error handling for `OpenAIRealtimeBetaLLMService`'s audio
|
||||
truncation. Audio truncation errors during interruptions now log a warning
|
||||
and allow the session to continue instead of throwing an exception.
|
||||
|
||||
- Modified `TranscriptProcessor` to use TTS text frames for more accurate assistant
|
||||
transcripts. Assistant messages are now aggregated based on bot speaking boundaries
|
||||
rather than LLM context, providing better handling of interruptions and partial
|
||||
@@ -26,11 +51,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed an `GeminiMultimodalLiveLLMService` issue that was preventing the user
|
||||
to push initial LLM assistant messages (using `LLMMessagesAppendFrame`).
|
||||
|
||||
- Added missing `FrameProcessor.cleanup()` calls to `Pipeline`,
|
||||
`ParallelPipeline` and `UserIdleProcessor`.
|
||||
|
||||
- Fixed a type error when using `voice_settings` in `ElevenLabsHttpTTSService`.
|
||||
|
||||
- Fixed an issue where `OpenAIRealtimeBetaLLMService` function calling resulted
|
||||
in an error.
|
||||
|
||||
- Fixed an issue in `AudioBufferProcessor` where the last audio buffer was not
|
||||
being processed, in cases where the `_user_audio_buffer` was smaller than the
|
||||
buffer size.
|
||||
|
||||
### Performance
|
||||
|
||||
- Replaced audio resampling library `resampy` with `soxr`. Resampling a 2:21s
|
||||
|
||||
@@ -53,7 +53,7 @@ To keep things lightweight, only the core framework is included by default. If y
|
||||
pip install "pipecat-ai[option,...]"
|
||||
```
|
||||
|
||||
Available options include:
|
||||
### Available services
|
||||
|
||||
| Category | Services | Install Command Example |
|
||||
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- |
|
||||
|
||||
@@ -39,7 +39,7 @@ Next, follow the steps in the README for each demo.
|
||||
| [Translation Chatbot](translation-chatbot) | Listens for user speech, then translates that speech to Spanish and speaks the translation back. Demonstrates multi-participant use-cases. | Deepgram, Azure, OpenAI, Daily, Daily Prebuilt UI |
|
||||
| [Moondream Chatbot](moondream-chatbot) | Demonstrates how to add vision capabilities to GPT4. **Note: works best with a GPU** | Deepgram, ElevenLabs, OpenAI, Moondream, Daily, Daily Prebuilt UI |
|
||||
| [Patient intake](patient-intake) | A chatbot that can call functions in response to user input. | Deepgram, ElevenLabs, OpenAI, Daily, Daily Prebuilt UI |
|
||||
| [Dialin Chatbot](dialin-chatbot) | A chatbot that connects to an incoming phone call from Daily or Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
|
||||
| [Phone Chatbot](phone-chatbot) | A chatbot that connects to PSTN/SIP phone calls, powered by Daily or Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
|
||||
| [Twilio Chatbot](twilio-chatbot) | A chatbot that connects to an incoming phone call from Twilio. | Deepgram, ElevenLabs, OpenAI, Daily, Twilio |
|
||||
| [studypal](studypal) | A chatbot to have a conversation about any article on the web | |
|
||||
| [WebSocket Chatbot Server](websocket-server) | A real-time websocket server that handles audio streaming and bot interactions with speech-to-text and text-to-speech capabilities. | Cartesia, Deepgram, OpenAI, Websockets |
|
||||
|
||||
@@ -53,4 +53,3 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token)
|
||||
return (url, token)
|
||||
|
||||
@@ -1,85 +0,0 @@
|
||||
<div align="center">
|
||||
<img alt="pipecat" width="300px" height="auto" src="image.png">
|
||||
</div>
|
||||
|
||||
# Dialin example
|
||||
|
||||
Example project that demonstrates how to add phone number dialin to your Pipecat bots. We include examples for both Daily (`bot_daily.py`) and Twilio (`bot_twilio.py`), depending on who you want to use as a phone vendor.
|
||||
|
||||
- 🔁 Transport: Daily WebRTC
|
||||
- 💬 Speech-to-Text: Deepgram via Daily transport
|
||||
- 🤖 LLM: GPT4-o / OpenAI
|
||||
- 🔉 Text-to-Speech: ElevenLabs
|
||||
|
||||
#### Should I use Daily or Twilio as a vendor?
|
||||
|
||||
If you're starting from scratch, using Daily to provision phone numbers alongside Daily as a transport offers some convenience (such as automatic call forwarding.)
|
||||
|
||||
If you already have Twilio numbers and workflows that you want to connect to your Pipecat bots, there is some additional configuration required (you'll need to create a `on_dialin_ready` and use the Twilio client to trigger the forward.)
|
||||
|
||||
You can read more about this, as well as see respective walkthroughs in our docs.
|
||||
|
||||
## Setup
|
||||
|
||||
```shell
|
||||
# Install the requirements
|
||||
pip install -r requirements.txt
|
||||
|
||||
# Setup your env
|
||||
mv env.example .env
|
||||
```
|
||||
|
||||
## Using Daily numbers
|
||||
|
||||
Run `bot_runner.py` to handle incoming HTTP requests:
|
||||
|
||||
`python bot_runner.py --host localhost`
|
||||
|
||||
Then target the following URL:
|
||||
|
||||
`POST /daily_start_bot`
|
||||
|
||||
For more configuration options, please consult Daily's API documentation.
|
||||
|
||||
|
||||
## Using Twilio numbers
|
||||
|
||||
As above, but target the following URL:
|
||||
|
||||
`POST /twilio_start_bot`
|
||||
|
||||
For more configuration options, please consult Twilio's API documentation.
|
||||
|
||||
## Deployment example
|
||||
|
||||
A Dockerfile is included in this demo for convenience. Here is an example of how to build and deploy your bot to [fly.io](https://fly.io).
|
||||
|
||||
*Please note: This demo spawns agents as subprocesses for convenience / demonstration purposes. You would likely not want to do this in production as it would limit concurrency to available system resources. For more information on how to deploy your bots using VMs, refer to the Pipecat documentation.*
|
||||
|
||||
### Build the docker image
|
||||
|
||||
`docker build -t tag:project .`
|
||||
|
||||
### Launch the fly project
|
||||
|
||||
`mv fly.example.toml fly.toml`
|
||||
|
||||
`fly launch` (using the included fly.toml)
|
||||
|
||||
### Setup your secrets on Fly
|
||||
|
||||
Set the necessary secrets (found in `env.example`)
|
||||
|
||||
`fly secrets set DAILY_API_KEY=... OPENAI_API_KEY=... ELEVENLABS_API_KEY=... ELEVENLABS_VOICE_ID=...`
|
||||
|
||||
If you're using Twilio as a number vendor:
|
||||
|
||||
`fly secrets set TWILIO_ACCOUNT_SID=... TWILIO_AUTH_TOKEN=...`
|
||||
|
||||
### Deploy!
|
||||
|
||||
`fly deploy`
|
||||
|
||||
## Need to do something more advanced?
|
||||
|
||||
This demo covers the basics of bot telephony. If you want to know more about working with PSTN / SIP, please ping us on [Discord](https://discord.gg/pipecat).
|
||||
@@ -15,10 +15,17 @@ from PIL import Image
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import EndFrame, Frame, OutputImageRawFrame, SystemFrame, TextFrame
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
BotStoppedSpeakingFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
OutputImageRawFrame,
|
||||
TextFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.cartesia import CartesiaHttpTTSService
|
||||
@@ -45,7 +52,7 @@ class ImageSyncAggregator(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if not isinstance(frame, SystemFrame) and direction == FrameDirection.DOWNSTREAM:
|
||||
if isinstance(frame, BotStartedSpeakingFrame):
|
||||
await self.push_frame(
|
||||
OutputImageRawFrame(
|
||||
image=self._speaking_image_bytes,
|
||||
@@ -53,7 +60,8 @@ class ImageSyncAggregator(FrameProcessor):
|
||||
format=self._speaking_image_format,
|
||||
)
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
|
||||
elif isinstance(frame, BotStoppedSpeakingFrame):
|
||||
await self.push_frame(
|
||||
OutputImageRawFrame(
|
||||
image=self._waiting_image_bytes,
|
||||
@@ -61,8 +69,8 @@ class ImageSyncAggregator(FrameProcessor):
|
||||
format=self._waiting_image_format,
|
||||
)
|
||||
)
|
||||
else:
|
||||
await self.push_frame(frame)
|
||||
|
||||
await self.push_frame(frame)
|
||||
|
||||
|
||||
async def main():
|
||||
@@ -109,16 +117,24 @@ async def main():
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
image_sync_aggregator,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
tts,
|
||||
image_sync_aggregator,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline)
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
report_only_initial_ttfb=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
|
||||
@@ -15,6 +15,7 @@ from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -29,11 +30,8 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
# await llm.push_frame(TextFrame("Let me check on that."))
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
@@ -30,11 +31,8 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
# await llm.push_frame(TextFrame("Let me check on that."))
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -30,11 +31,8 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
# await llm.push_frame(TextFrame("Let me check on that."))
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -30,11 +31,8 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
# await llm.push_frame(TextFrame("Let me check on that."))
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -30,11 +31,8 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
# await llm.push_frame(TextFrame("Let me check on that."))
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -30,11 +31,8 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
# await llm.push_frame(TextFrame("Let me check on that."))
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -30,11 +31,8 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
# await llm.push_frame(TextFrame("Let me check on that."))
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -30,11 +31,8 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
# await llm.push_frame(TextFrame("Let me check on that."))
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
@@ -95,7 +93,7 @@ async def main():
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way.
|
||||
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way.
|
||||
|
||||
You have one functions available:
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -30,11 +31,8 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
# await llm.push_frame(TextFrame("Let me check on that."))
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
@@ -95,7 +93,7 @@ async def main():
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way.
|
||||
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way.
|
||||
|
||||
You have one functions available:
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from openai.types.chat import ChatCompletionToolParam
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -30,11 +31,8 @@ logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
async def start_fetch_weather(function_name, llm, context):
|
||||
# note: we can't push a frame to the LLM here. the bot
|
||||
# can interrupt itself and/or cause audio overlapping glitches.
|
||||
# possible question for Aleix and Chad about what the right way
|
||||
# to trigger speech is, now, with the new queues/async/sync refactors.
|
||||
# await llm.push_frame(TextFrame("Let me check on that."))
|
||||
"""Push a frame to the LLM; this is handy when the LLM response might take a while."""
|
||||
await llm.push_frame(TTSSpeakFrame("Let me check on that."))
|
||||
logger.debug(f"Starting fetch_weather_from_api with function_name: {function_name}")
|
||||
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from loguru import logger
|
||||
from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMMessagesFrame
|
||||
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -63,16 +63,36 @@ async def main():
|
||||
context = OpenAILLMContext(messages)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
async def user_idle_callback(user_idle: UserIdleProcessor):
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Ask the user if they are still there and try to prompt for some input, but be short.",
|
||||
}
|
||||
)
|
||||
await user_idle.push_frame(LLMMessagesFrame(messages))
|
||||
async def handle_user_idle(user_idle: UserIdleProcessor, retry_count: int) -> bool:
|
||||
if retry_count == 1:
|
||||
# First attempt: Add a gentle prompt to the conversation
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "The user has been quiet. Politely and briefly ask if they're still there.",
|
||||
}
|
||||
)
|
||||
await user_idle.push_frame(LLMMessagesFrame(messages))
|
||||
return True
|
||||
elif retry_count == 2:
|
||||
# Second attempt: More direct prompt
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": "The user is still inactive. Ask if they'd like to continue our conversation.",
|
||||
}
|
||||
)
|
||||
await user_idle.push_frame(LLMMessagesFrame(messages))
|
||||
return True
|
||||
else:
|
||||
# Third attempt: End the conversation
|
||||
await user_idle.push_frame(
|
||||
TTSSpeakFrame("It seems like you're busy right now. Have a nice day!")
|
||||
)
|
||||
await task.queue_frame(EndFrame())
|
||||
return False
|
||||
|
||||
user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0)
|
||||
user_idle = UserIdleProcessor(callback=handle_user_idle, timeout=5.0)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
|
||||
@@ -169,8 +169,7 @@ class OutputGate(FrameProcessor):
|
||||
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
self._gate_task.cancel()
|
||||
await self._gate_task
|
||||
await self.cancel_task(self._gate_task)
|
||||
|
||||
async def _gate_task_handler(self):
|
||||
while True:
|
||||
|
||||
@@ -101,12 +101,12 @@ HIGH PRIORITY SIGNALS:
|
||||
|
||||
Examples:
|
||||
# Complete Wh-question
|
||||
[{"role": "assistant", "content": "I can help you learn."},
|
||||
[{"role": "assistant", "content": "I can help you learn."},
|
||||
{"role": "user", "content": "What's the fastest way to learn Spanish"}]
|
||||
Output: YES
|
||||
|
||||
# Complete Yes/No question despite STT error
|
||||
[{"role": "assistant", "content": "I know about planets."},
|
||||
[{"role": "assistant", "content": "I know about planets."},
|
||||
{"role": "user", "content": "Is is Jupiter the biggest planet"}]
|
||||
Output: YES
|
||||
|
||||
@@ -118,12 +118,12 @@ Output: YES
|
||||
|
||||
Examples:
|
||||
# Direct instruction
|
||||
[{"role": "assistant", "content": "I can explain many topics."},
|
||||
[{"role": "assistant", "content": "I can explain many topics."},
|
||||
{"role": "user", "content": "Tell me about black holes"}]
|
||||
Output: YES
|
||||
|
||||
# Action demand
|
||||
[{"role": "assistant", "content": "I can help with math."},
|
||||
[{"role": "assistant", "content": "I can help with math."},
|
||||
{"role": "user", "content": "Solve this equation x plus 5 equals 12"}]
|
||||
Output: YES
|
||||
|
||||
@@ -134,12 +134,12 @@ Output: YES
|
||||
|
||||
Examples:
|
||||
# Specific answer
|
||||
[{"role": "assistant", "content": "What's your favorite color?"},
|
||||
[{"role": "assistant", "content": "What's your favorite color?"},
|
||||
{"role": "user", "content": "I really like blue"}]
|
||||
Output: YES
|
||||
|
||||
# Option selection
|
||||
[{"role": "assistant", "content": "Would you prefer morning or evening?"},
|
||||
[{"role": "assistant", "content": "Would you prefer morning or evening?"},
|
||||
{"role": "user", "content": "Morning"}]
|
||||
Output: YES
|
||||
|
||||
@@ -153,17 +153,17 @@ MEDIUM PRIORITY SIGNALS:
|
||||
|
||||
Examples:
|
||||
# Self-correction reaching completion
|
||||
[{"role": "assistant", "content": "What would you like to know?"},
|
||||
[{"role": "assistant", "content": "What would you like to know?"},
|
||||
{"role": "user", "content": "Tell me about... no wait, explain how rainbows form"}]
|
||||
Output: YES
|
||||
|
||||
# Topic change with complete thought
|
||||
[{"role": "assistant", "content": "The weather is nice today."},
|
||||
[{"role": "assistant", "content": "The weather is nice today."},
|
||||
{"role": "user", "content": "Actually can you tell me who invented the telephone"}]
|
||||
Output: YES
|
||||
|
||||
# Mid-sentence completion
|
||||
[{"role": "assistant", "content": "Hello I'm ready."},
|
||||
[{"role": "assistant", "content": "Hello I'm ready."},
|
||||
{"role": "user", "content": "What's the capital of? France"}]
|
||||
Output: YES
|
||||
|
||||
@@ -175,12 +175,12 @@ Output: YES
|
||||
|
||||
Examples:
|
||||
# Acknowledgment
|
||||
[{"role": "assistant", "content": "Should we talk about history?"},
|
||||
[{"role": "assistant", "content": "Should we talk about history?"},
|
||||
{"role": "user", "content": "Sure"}]
|
||||
Output: YES
|
||||
|
||||
# Disagreement with completion
|
||||
[{"role": "assistant", "content": "Is that what you meant?"},
|
||||
[{"role": "assistant", "content": "Is that what you meant?"},
|
||||
{"role": "user", "content": "No not really"}]
|
||||
Output: YES
|
||||
|
||||
@@ -194,12 +194,12 @@ LOW PRIORITY SIGNALS:
|
||||
|
||||
Examples:
|
||||
# Word repetition but complete
|
||||
[{"role": "assistant", "content": "I can help with that."},
|
||||
[{"role": "assistant", "content": "I can help with that."},
|
||||
{"role": "user", "content": "What what is the time right now"}]
|
||||
Output: YES
|
||||
|
||||
# Missing punctuation but complete
|
||||
[{"role": "assistant", "content": "I can explain that."},
|
||||
[{"role": "assistant", "content": "I can explain that."},
|
||||
{"role": "user", "content": "Please tell me how computers work"}]
|
||||
Output: YES
|
||||
|
||||
@@ -211,12 +211,12 @@ Output: YES
|
||||
|
||||
Examples:
|
||||
# Filler words but complete
|
||||
[{"role": "assistant", "content": "What would you like to know?"},
|
||||
[{"role": "assistant", "content": "What would you like to know?"},
|
||||
{"role": "user", "content": "Um uh how do airplanes fly"}]
|
||||
Output: YES
|
||||
|
||||
# Thinking pause but incomplete
|
||||
[{"role": "assistant", "content": "I can explain anything."},
|
||||
[{"role": "assistant", "content": "I can explain anything."},
|
||||
{"role": "user", "content": "Well um I want to know about the"}]
|
||||
Output: NO
|
||||
|
||||
@@ -241,17 +241,17 @@ DECISION RULES:
|
||||
|
||||
Examples:
|
||||
# Incomplete despite corrections
|
||||
[{"role": "assistant", "content": "What would you like to know about?"},
|
||||
[{"role": "assistant", "content": "What would you like to know about?"},
|
||||
{"role": "user", "content": "Can you tell me about"}]
|
||||
Output: NO
|
||||
|
||||
# Complete despite multiple artifacts
|
||||
[{"role": "assistant", "content": "I can help you learn."},
|
||||
[{"role": "assistant", "content": "I can help you learn."},
|
||||
{"role": "user", "content": "How do you I mean what's the best way to learn programming"}]
|
||||
Output: YES
|
||||
|
||||
# Trailing off incomplete
|
||||
[{"role": "assistant", "content": "I can explain anything."},
|
||||
[{"role": "assistant", "content": "I can explain anything."},
|
||||
{"role": "user", "content": "I was wondering if you could tell me why"}]
|
||||
Output: NO
|
||||
"""
|
||||
@@ -374,8 +374,7 @@ class OutputGate(FrameProcessor):
|
||||
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
self._gate_task.cancel()
|
||||
await self._gate_task
|
||||
await self.cancel_task(self._gate_task)
|
||||
|
||||
async def _gate_task_handler(self):
|
||||
while True:
|
||||
|
||||
@@ -44,9 +44,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
|
||||
)
|
||||
from pipecat.processors.filters.function_filter import FunctionFilter
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.user_idle_processor import UserIdleProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
|
||||
from pipecat.sync.base_notifier import BaseNotifier
|
||||
from pipecat.sync.event_notifier import EventNotifier
|
||||
@@ -440,11 +438,11 @@ class CompletenessCheck(FrameProcessor):
|
||||
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
if self._idle_task:
|
||||
self._idle_task.cancel()
|
||||
await self.cancel_task(self._idle_task)
|
||||
elif isinstance(frame, TextFrame) and frame.text.startswith("YES"):
|
||||
logger.debug("Completeness check YES")
|
||||
if self._idle_task:
|
||||
self._idle_task.cancel()
|
||||
await self.cancel_task(self._idle_task)
|
||||
await self.push_frame(UserStoppedSpeakingFrame())
|
||||
await self._audio_accumulator.reset()
|
||||
await self._notifier.notify()
|
||||
@@ -602,8 +600,7 @@ class OutputGate(FrameProcessor):
|
||||
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
self._gate_task.cancel()
|
||||
await self._gate_task
|
||||
await self.cancel_task(self._gate_task)
|
||||
|
||||
async def _gate_task_handler(self):
|
||||
while True:
|
||||
|
||||
@@ -15,6 +15,7 @@ from runner import configure
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.frames.frames import LLMMessagesAppendFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -71,6 +72,21 @@ async def main():
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await task.queue_frames(
|
||||
[
|
||||
LLMMessagesAppendFrame(
|
||||
messages=[
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "Greet the user.",
|
||||
}
|
||||
]
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
130
examples/foundational/31-gemini-grounding-metadata.py
Normal file
130
examples/foundational/31-gemini-grounding-metadata.py
Normal file
@@ -0,0 +1,130 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.google import GoogleLLMService, LLMSearchResponseFrame
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
|
||||
sys.path.append(str(Path(__file__).parent.parent))
|
||||
from runner import configure
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
# Function handlers for the LLM
|
||||
search_tool = {"google_search_retrieval": {}}
|
||||
tools = [search_tool]
|
||||
|
||||
system_instruction = """
|
||||
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so avoid using special characters or overly complex formatting.
|
||||
|
||||
Always use the google search API to retrieve the latest news. You must also use it to check which day is today.
|
||||
|
||||
You can:
|
||||
- Use the Google search API to check the current date.
|
||||
- Provide the most recent and relevant news from any place by using the google search API.
|
||||
- Answer any questions the user may have, ensuring your responses are accurate and concise.
|
||||
|
||||
Start each interaction by asking the user about which place they would like to know the information.
|
||||
"""
|
||||
|
||||
|
||||
class LLMSearchLoggerProcessor(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, LLMSearchResponseFrame):
|
||||
print(f"LLMSearchLoggerProcessor: {frame}")
|
||||
|
||||
await self.push_frame(frame)
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Latest news!",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
)
|
||||
|
||||
# Initialize the Gemini Multimodal Live model
|
||||
llm = GoogleLLMService(
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
system_instruction=system_instruction,
|
||||
tools=tools,
|
||||
)
|
||||
|
||||
context = OpenAILLMContext(
|
||||
[
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Start by greeting the user warmly, introducing yourself, and mentioning the current day. Be friendly and engaging to set a positive tone for the interaction.",
|
||||
}
|
||||
],
|
||||
)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
llm_search_logger = LLMSearchLoggerProcessor()
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
llm_search_logger,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
runner = PipelineRunner()
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
51
examples/news-chatbot/.gitignore
vendored
Normal file
51
examples/news-chatbot/.gitignore
vendored
Normal file
@@ -0,0 +1,51 @@
|
||||
# Python
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
*.so
|
||||
.Python
|
||||
build/
|
||||
dist/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
.pytest_cache/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# JavaScript/Node.js
|
||||
node_modules/
|
||||
dist/
|
||||
dist-ssr/
|
||||
*.local
|
||||
.env.local
|
||||
.env.development.local
|
||||
.env.test.local
|
||||
.env.production.local
|
||||
|
||||
# Logs
|
||||
logs/
|
||||
*.log
|
||||
npm-debug.log*
|
||||
yarn-debug.log*
|
||||
yarn-error.log*
|
||||
pnpm-debug.log*
|
||||
|
||||
# Editor/IDE
|
||||
.vscode/*
|
||||
!.vscode/extensions.json
|
||||
.idea/
|
||||
*.swp
|
||||
*.swo
|
||||
.DS_Store
|
||||
|
||||
# Project specific
|
||||
runpod.toml
|
||||
48
examples/news-chatbot/README.md
Normal file
48
examples/news-chatbot/README.md
Normal file
@@ -0,0 +1,48 @@
|
||||
# News Chatbot
|
||||
|
||||
A simple AI-powered chatbot that leverages Gemini's real-time search capabilities in a voice AI application.
|
||||
|
||||
This example demonstrates Gemini's ability to query Google search in real time and return relevant responses, including links to the URLs that Gemini searched.
|
||||
|
||||
All the details about grounding with Google Search can be found [here](https://ai.google.dev/gemini-api/docs/grounding?lang=python).
|
||||
|
||||
## Quick Start
|
||||
|
||||
### First, start the bot server:
|
||||
|
||||
1. Navigate to the server directory:
|
||||
```bash
|
||||
cd server
|
||||
```
|
||||
2. Create and activate a virtual environment:
|
||||
```bash
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate # On Windows: venv\Scripts\activate
|
||||
```
|
||||
3. Install requirements:
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
4. Copy env.example to .env and configure:
|
||||
- Add your API keys
|
||||
5. Start the server:
|
||||
```bash
|
||||
python server.py
|
||||
```
|
||||
|
||||
### Next, connect using the client app:
|
||||
|
||||
For client-side setup, refer to the [JavaScript Guide](client/javascript/README.md).
|
||||
|
||||
## Important Note
|
||||
|
||||
Ensure the bot server is running before using any client implementations.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.10+
|
||||
- Node.js 16+ (for JavaScript and React implementations)
|
||||
- Daily API key
|
||||
- Gemini API key (for Gemini bot)
|
||||
- Cartesia API key
|
||||
- Modern web browser with WebRTC support
|
||||
27
examples/news-chatbot/client/javascript/README.md
Normal file
27
examples/news-chatbot/client/javascript/README.md
Normal file
@@ -0,0 +1,27 @@
|
||||
# JavaScript Implementation
|
||||
|
||||
Basic implementation using the [Pipecat JavaScript SDK](https://docs.pipecat.ai/client/js/introduction).
|
||||
|
||||
## Setup
|
||||
|
||||
1. Run the bot server. See the [server README](../../README).
|
||||
|
||||
2. Navigate to the `client/javascript` directory:
|
||||
|
||||
```bash
|
||||
cd client/javascript
|
||||
```
|
||||
|
||||
3. Install dependencies:
|
||||
|
||||
```bash
|
||||
npm install
|
||||
```
|
||||
|
||||
4. Run the client app:
|
||||
|
||||
```
|
||||
npm run dev
|
||||
```
|
||||
|
||||
5. Visit http://localhost:5173 in your browser.
|
||||
40
examples/news-chatbot/client/javascript/index.html
Normal file
40
examples/news-chatbot/client/javascript/index.html
Normal file
@@ -0,0 +1,40 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>AI Chatbot</title>
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<div class="container">
|
||||
<div class="status-bar">
|
||||
<div class="status">
|
||||
Status: <span id="connection-status">Disconnected</span>
|
||||
</div>
|
||||
<div class="controls">
|
||||
<button id="connect-btn">Connect</button>
|
||||
<button id="disconnect-btn" disabled>Disconnect</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="main-content">
|
||||
<div class="bot-container">
|
||||
<div id="search-result-container">
|
||||
</div>
|
||||
<audio id="bot-audio" autoplay></audio>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="debug-panel">
|
||||
<h3>Debug Info</h3>
|
||||
<div id="debug-log"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<script type="module" src="/src/app.js"></script>
|
||||
<link rel="stylesheet" href="/src/style.css">
|
||||
</body>
|
||||
|
||||
</html>
|
||||
1176
examples/news-chatbot/client/javascript/package-lock.json
generated
Normal file
1176
examples/news-chatbot/client/javascript/package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
21
examples/news-chatbot/client/javascript/package.json
Normal file
21
examples/news-chatbot/client/javascript/package.json
Normal file
@@ -0,0 +1,21 @@
|
||||
{
|
||||
"name": "client",
|
||||
"version": "1.0.0",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"dev": "vite",
|
||||
"build": "vite build",
|
||||
"preview": "vite preview"
|
||||
},
|
||||
"keywords": [],
|
||||
"author": "",
|
||||
"license": "ISC",
|
||||
"description": "",
|
||||
"devDependencies": {
|
||||
"vite": "^6.0.2"
|
||||
},
|
||||
"dependencies": {
|
||||
"@pipecat-ai/client-js": "^0.3.2",
|
||||
"@pipecat-ai/daily-transport": "^0.3.4"
|
||||
}
|
||||
}
|
||||
341
examples/news-chatbot/client/javascript/src/app.js
Normal file
341
examples/news-chatbot/client/javascript/src/app.js
Normal file
@@ -0,0 +1,341 @@
|
||||
/**
|
||||
* Copyright (c) 2024–2025, Daily
|
||||
*
|
||||
* SPDX-License-Identifier: BSD 2-Clause License
|
||||
*/
|
||||
|
||||
/**
|
||||
* RTVI Client Implementation
|
||||
*
|
||||
* This client connects to an RTVI-compatible bot server using WebRTC (via Daily).
|
||||
* It handles audio/video streaming and manages the connection lifecycle.
|
||||
*
|
||||
* Requirements:
|
||||
* - A running RTVI bot server (defaults to http://localhost:7860)
|
||||
* - The server must implement the /connect endpoint that returns Daily.co room credentials
|
||||
* - Browser with WebRTC support
|
||||
*/
|
||||
|
||||
import {LogLevel, RTVIClient, RTVIClientHelper, RTVIEvent} from '@pipecat-ai/client-js';
|
||||
import { DailyTransport } from '@pipecat-ai/daily-transport';
|
||||
|
||||
class SearchResponseHelper extends RTVIClientHelper {
|
||||
|
||||
constructor(contentPanel) {
|
||||
super()
|
||||
this.contentPanel = contentPanel
|
||||
}
|
||||
|
||||
handleMessage(rtviMessage) {
|
||||
console.log("SearchResponseHelper, received message:", rtviMessage)
|
||||
if (rtviMessage.data) {
|
||||
// Clear existing content
|
||||
this.contentPanel.innerHTML = "";
|
||||
|
||||
// Create a container for all content
|
||||
const contentContainer = document.createElement('div');
|
||||
contentContainer.className = "content-container";
|
||||
|
||||
// Add the search_result
|
||||
if (rtviMessage.data.search_result) {
|
||||
const searchResultDiv = document.createElement('div');
|
||||
searchResultDiv.className = "search-result";
|
||||
searchResultDiv.textContent = rtviMessage.data.search_result;
|
||||
contentContainer.appendChild(searchResultDiv);
|
||||
}
|
||||
|
||||
// Add the sources
|
||||
if (rtviMessage.data.origins) {
|
||||
const sourcesDiv = document.createElement('div');
|
||||
sourcesDiv.className = "sources";
|
||||
|
||||
const sourcesTitle = document.createElement('h3');
|
||||
sourcesTitle.className = "sources-title";
|
||||
sourcesTitle.textContent = "Sources:";
|
||||
sourcesDiv.appendChild(sourcesTitle);
|
||||
|
||||
rtviMessage.data.origins.forEach(origin => {
|
||||
const sourceLink = document.createElement('a');
|
||||
sourceLink.className = "source-link";
|
||||
sourceLink.href = origin.site_uri;
|
||||
sourceLink.target = "_blank";
|
||||
sourceLink.textContent = origin.site_title;
|
||||
sourcesDiv.appendChild(sourceLink);
|
||||
});
|
||||
|
||||
contentContainer.appendChild(sourcesDiv);
|
||||
}
|
||||
|
||||
// Add the rendered_content in an iframe
|
||||
if (rtviMessage.data.rendered_content) {
|
||||
const iframe = document.createElement('iframe');
|
||||
iframe.className = "iframe-container";
|
||||
iframe.srcdoc = rtviMessage.data.rendered_content;
|
||||
contentContainer.appendChild(iframe);
|
||||
}
|
||||
|
||||
// Append the content container to the content panel
|
||||
this.contentPanel.appendChild(contentContainer);
|
||||
}
|
||||
}
|
||||
|
||||
getMessageTypes() {
|
||||
return ["bot-llm-search-response"]
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ChatbotClient handles the connection and media management for a real-time
|
||||
* voice and video interaction with an AI bot.
|
||||
*/
|
||||
class ChatbotClient {
|
||||
constructor() {
|
||||
// Initialize client state
|
||||
this.rtviClient = null;
|
||||
this.setupDOMElements();
|
||||
this.setupEventListeners();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up references to DOM elements and create necessary media elements
|
||||
*/
|
||||
setupDOMElements() {
|
||||
// Get references to UI control elements
|
||||
this.connectBtn = document.getElementById('connect-btn');
|
||||
this.disconnectBtn = document.getElementById('disconnect-btn');
|
||||
this.statusSpan = document.getElementById('connection-status');
|
||||
this.debugLog = document.getElementById('debug-log');
|
||||
this.searchResultContainer = document.getElementById('search-result-container');
|
||||
|
||||
// Create an audio element for bot's voice output
|
||||
this.botAudio = document.createElement('audio');
|
||||
this.botAudio.autoplay = true;
|
||||
this.botAudio.playsInline = true;
|
||||
document.body.appendChild(this.botAudio);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up event listeners for connect/disconnect buttons
|
||||
*/
|
||||
setupEventListeners() {
|
||||
this.connectBtn.addEventListener('click', () => this.connect());
|
||||
this.disconnectBtn.addEventListener('click', () => this.disconnect());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a timestamped message to the debug log
|
||||
*/
|
||||
log(message) {
|
||||
const entry = document.createElement('div');
|
||||
entry.textContent = `${new Date().toISOString()} - ${message}`;
|
||||
|
||||
// Add styling based on message type
|
||||
if (message.startsWith('User: ')) {
|
||||
entry.style.color = '#2196F3'; // blue for user
|
||||
} else if (message.startsWith('Bot: ')) {
|
||||
entry.style.color = '#4CAF50'; // green for bot
|
||||
}
|
||||
|
||||
this.debugLog.appendChild(entry);
|
||||
this.debugLog.scrollTop = this.debugLog.scrollHeight;
|
||||
console.log(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the connection status display
|
||||
*/
|
||||
updateStatus(status) {
|
||||
this.statusSpan.textContent = status;
|
||||
this.log(`Status: ${status}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check for available media tracks and set them up if present
|
||||
* This is called when the bot is ready or when the transport state changes to ready
|
||||
*/
|
||||
setupMediaTracks() {
|
||||
if (!this.rtviClient) return;
|
||||
|
||||
// Get current tracks from the client
|
||||
const tracks = this.rtviClient.tracks();
|
||||
|
||||
// Set up any available bot tracks
|
||||
if (tracks.bot?.audio) {
|
||||
this.setupAudioTrack(tracks.bot.audio);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up listeners for track events (start/stop)
|
||||
* This handles new tracks being added during the session
|
||||
*/
|
||||
setupTrackListeners() {
|
||||
if (!this.rtviClient) return;
|
||||
|
||||
// Listen for new tracks starting
|
||||
this.rtviClient.on(RTVIEvent.TrackStarted, (track, participant) => {
|
||||
// Only handle non-local (bot) tracks
|
||||
if (!participant?.local && track.kind === 'audio') {
|
||||
this.setupAudioTrack(track);
|
||||
}
|
||||
});
|
||||
|
||||
// Listen for tracks stopping
|
||||
this.rtviClient.on(RTVIEvent.TrackStopped, (track, participant) => {
|
||||
this.log(
|
||||
`Track stopped event: ${track.kind} from ${
|
||||
participant?.name || 'unknown'
|
||||
}`
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up an audio track for playback
|
||||
* Handles both initial setup and track updates
|
||||
*/
|
||||
setupAudioTrack(track) {
|
||||
this.log('Setting up audio track');
|
||||
// Check if we're already playing this track
|
||||
if (this.botAudio.srcObject) {
|
||||
const oldTrack = this.botAudio.srcObject.getAudioTracks()[0];
|
||||
if (oldTrack?.id === track.id) return;
|
||||
}
|
||||
// Create a new MediaStream with the track and set it as the audio source
|
||||
this.botAudio.srcObject = new MediaStream([track]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize and connect to the bot
|
||||
* This sets up the RTVI client, initializes devices, and establishes the connection
|
||||
*/
|
||||
async connect() {
|
||||
try {
|
||||
// Create a new Daily transport for WebRTC communication
|
||||
const transport = new DailyTransport();
|
||||
|
||||
// Initialize the RTVI client with our configuration
|
||||
this.rtviClient = new RTVIClient({
|
||||
transport,
|
||||
params: {
|
||||
// The baseURL and endpoint of your bot server that the client will connect to
|
||||
baseUrl: 'http://localhost:7860',
|
||||
endpoints: {
|
||||
connect: '/connect',
|
||||
},
|
||||
},
|
||||
enableMic: true, // Enable microphone for user input
|
||||
enableCam: false,
|
||||
callbacks: {
|
||||
// Handle connection state changes
|
||||
onConnected: () => {
|
||||
this.updateStatus('Connected');
|
||||
this.connectBtn.disabled = true;
|
||||
this.disconnectBtn.disabled = false;
|
||||
this.log('Client connected');
|
||||
},
|
||||
onDisconnected: () => {
|
||||
this.updateStatus('Disconnected');
|
||||
this.connectBtn.disabled = false;
|
||||
this.disconnectBtn.disabled = true;
|
||||
this.log('Client disconnected');
|
||||
},
|
||||
// Handle transport state changes
|
||||
onTransportStateChanged: (state) => {
|
||||
this.updateStatus(`Transport: ${state}`);
|
||||
this.log(`Transport state changed: ${state}`);
|
||||
if (state === 'ready') {
|
||||
this.setupMediaTracks();
|
||||
}
|
||||
},
|
||||
// Handle bot connection events
|
||||
onBotConnected: (participant) => {
|
||||
this.log(`Bot connected: ${JSON.stringify(participant)}`);
|
||||
},
|
||||
onBotDisconnected: (participant) => {
|
||||
this.log(`Bot disconnected: ${JSON.stringify(participant)}`);
|
||||
},
|
||||
onBotReady: (data) => {
|
||||
this.log(`Bot ready: ${JSON.stringify(data)}`);
|
||||
this.setupMediaTracks();
|
||||
},
|
||||
// Transcript events
|
||||
onUserTranscript: (data) => {
|
||||
// Only log final transcripts
|
||||
if (data.final) {
|
||||
this.log(`User: ${data.text}`);
|
||||
}
|
||||
},
|
||||
onBotTranscript: (data) => {
|
||||
this.log(`Bot: ${data.text}`);
|
||||
},
|
||||
// Error handling
|
||||
onMessageError: (error) => {
|
||||
console.log('Message error:', error);
|
||||
},
|
||||
onError: (error) => {
|
||||
console.log('Error:', error);
|
||||
},
|
||||
},
|
||||
});
|
||||
//this.rtviClient.setLogLevel(LogLevel.DEBUG)
|
||||
this.rtviClient.registerHelper("llm", new SearchResponseHelper(this.searchResultContainer))
|
||||
|
||||
// Set up listeners for media track events
|
||||
this.setupTrackListeners();
|
||||
|
||||
// Initialize audio devices
|
||||
this.log('Initializing devices...');
|
||||
await this.rtviClient.initDevices();
|
||||
|
||||
// Connect to the bot
|
||||
this.log('Connecting to bot...');
|
||||
await this.rtviClient.connect();
|
||||
|
||||
this.log('Connection complete');
|
||||
} catch (error) {
|
||||
// Handle any errors during connection
|
||||
this.log(`Error connecting: ${error.message}`);
|
||||
this.log(`Error stack: ${error.stack}`);
|
||||
this.updateStatus('Error');
|
||||
|
||||
// Clean up if there's an error
|
||||
if (this.rtviClient) {
|
||||
try {
|
||||
await this.rtviClient.disconnect();
|
||||
} catch (disconnectError) {
|
||||
this.log(`Error during disconnect: ${disconnectError.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from the bot and clean up media resources
|
||||
*/
|
||||
async disconnect() {
|
||||
if (this.rtviClient) {
|
||||
try {
|
||||
// Disconnect the RTVI client
|
||||
await this.rtviClient.disconnect();
|
||||
this.rtviClient = null;
|
||||
|
||||
// Clean up audio
|
||||
if (this.botAudio.srcObject) {
|
||||
this.botAudio.srcObject.getTracks().forEach((track) => track.stop());
|
||||
this.botAudio.srcObject = null;
|
||||
}
|
||||
|
||||
// Clean up video
|
||||
this.searchResultContainer.innerHTML = '';
|
||||
} catch (error) {
|
||||
this.log(`Error disconnecting: ${error.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the client when the page loads
|
||||
window.addEventListener('DOMContentLoaded', () => {
|
||||
new ChatbotClient();
|
||||
});
|
||||
134
examples/news-chatbot/client/javascript/src/style.css
Normal file
134
examples/news-chatbot/client/javascript/src/style.css
Normal file
@@ -0,0 +1,134 @@
|
||||
body {
|
||||
margin: 0;
|
||||
padding: 20px;
|
||||
font-family: Arial, sans-serif;
|
||||
background-color: #f0f0f0;
|
||||
}
|
||||
|
||||
.container {
|
||||
max-width: 1200px;
|
||||
margin: 0 auto;
|
||||
}
|
||||
|
||||
.status-bar {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: center;
|
||||
padding: 10px;
|
||||
background-color: #fff;
|
||||
border-radius: 8px;
|
||||
margin-bottom: 20px;
|
||||
}
|
||||
|
||||
.controls button {
|
||||
padding: 8px 16px;
|
||||
margin-left: 10px;
|
||||
border: none;
|
||||
border-radius: 4px;
|
||||
cursor: pointer;
|
||||
}
|
||||
|
||||
#connect-btn {
|
||||
background-color: #4caf50;
|
||||
color: white;
|
||||
}
|
||||
|
||||
#disconnect-btn {
|
||||
background-color: #f44336;
|
||||
color: white;
|
||||
}
|
||||
|
||||
button:disabled {
|
||||
opacity: 0.5;
|
||||
cursor: not-allowed;
|
||||
}
|
||||
|
||||
.main-content {
|
||||
background-color: #fff;
|
||||
border-radius: 8px;
|
||||
padding: 20px;
|
||||
margin-bottom: 20px;
|
||||
}
|
||||
|
||||
.bot-container {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
#search-result-container {
|
||||
background-color: #e0e0e0;
|
||||
padding: 20px;
|
||||
width: calc(100% - 40px);
|
||||
height: 450px;
|
||||
overflow: auto;
|
||||
}
|
||||
|
||||
/* Container for all content */
|
||||
.content-container {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 20px; /* Space between elements */
|
||||
font-family: Arial, sans-serif;
|
||||
}
|
||||
|
||||
/* Styles for the search result */
|
||||
.search-result {
|
||||
font-size: 16px;
|
||||
line-height: 1.5;
|
||||
color: #333;
|
||||
}
|
||||
|
||||
/* Styles for the sources container */
|
||||
.sources {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 8px; /* Space between source links */
|
||||
}
|
||||
|
||||
.sources-title {
|
||||
font-size: 16px;
|
||||
font-weight: bold;
|
||||
color: #444;
|
||||
}
|
||||
|
||||
/* Styles for source links */
|
||||
.source-link {
|
||||
text-decoration: none;
|
||||
color: #1a73e8;
|
||||
}
|
||||
|
||||
.source-link:hover {
|
||||
text-decoration: underline;
|
||||
}
|
||||
|
||||
/* Styles for the iframe container */
|
||||
.iframe-container {
|
||||
flex: none;
|
||||
width: 100%;
|
||||
height: 400px; /* Adjust height as needed */
|
||||
border: none;
|
||||
}
|
||||
|
||||
.debug-panel {
|
||||
background-color: #fff;
|
||||
border-radius: 8px;
|
||||
padding: 20px;
|
||||
}
|
||||
|
||||
.debug-panel h3 {
|
||||
margin: 0 0 10px 0;
|
||||
font-size: 16px;
|
||||
font-weight: bold;
|
||||
}
|
||||
|
||||
#debug-log {
|
||||
height: 200px;
|
||||
overflow-y: auto;
|
||||
background-color: #f8f8f8;
|
||||
padding: 10px;
|
||||
border-radius: 4px;
|
||||
font-family: monospace;
|
||||
font-size: 12px;
|
||||
line-height: 1.4;
|
||||
}
|
||||
13
examples/news-chatbot/client/javascript/vite.config.js
Normal file
13
examples/news-chatbot/client/javascript/vite.config.js
Normal file
@@ -0,0 +1,13 @@
|
||||
import { defineConfig } from 'vite';
|
||||
|
||||
export default defineConfig({
|
||||
server: {
|
||||
proxy: {
|
||||
// Proxy /api requests to the backend server
|
||||
'/connect': {
|
||||
target: 'http://0.0.0.0:7860', // Replace with your backend URL
|
||||
changeOrigin: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
52
examples/news-chatbot/server/README.md
Normal file
52
examples/news-chatbot/server/README.md
Normal file
@@ -0,0 +1,52 @@
|
||||
# News Chatbot Server
|
||||
|
||||
A FastAPI server that manages bot instances and provide endpoint for Pipecat client connections.
|
||||
|
||||
## Endpoints
|
||||
|
||||
- `POST /connect` - Pipecat client connection endpoint
|
||||
|
||||
## Environment Variables
|
||||
|
||||
Copy `env.example` to `.env` and configure:
|
||||
|
||||
```ini
|
||||
# Required API Keys
|
||||
DAILY_API_KEY= # Your Daily API key
|
||||
DEEPGRAM_API_KEY= # Your Deepgram API key
|
||||
GOOGLE_API_KEY= # Your Google/Gemini API key
|
||||
CARTESIA_API_KEY= # Your Cartesia API key
|
||||
|
||||
# Optional Configuration
|
||||
DAILY_API_URL= # Optional: Daily API URL (defaults to https://api.daily.co/v1)
|
||||
DAILY_SAMPLE_ROOM_URL= # Optional: Fixed room URL for development
|
||||
HOST= # Optional: Host address (defaults to 0.0.0.0)
|
||||
FAST_API_PORT= # Optional: Port number (defaults to 7860)
|
||||
```
|
||||
|
||||
## Running the Server
|
||||
|
||||
Set up and activate your virtual environment:
|
||||
|
||||
```bash
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate # On Windows: venv\Scripts\activate
|
||||
```
|
||||
|
||||
Install dependencies:
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
If you want to use the local version of `pipecat` in this repo rather than the last published version, also run:
|
||||
|
||||
```bash
|
||||
pip install --editable "../../../[daily,deepgram,google,cartesia,openai,silero]"
|
||||
```
|
||||
|
||||
Run the server:
|
||||
|
||||
```bash
|
||||
python server.py
|
||||
```
|
||||
5
examples/news-chatbot/server/env.example
Normal file
5
examples/news-chatbot/server/env.example
Normal file
@@ -0,0 +1,5 @@
|
||||
DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
|
||||
DAILY_API_KEY=
|
||||
CARTESIA_API_KEY=
|
||||
DEEPGRAM_API_KEY=
|
||||
GOOGLE_API_KEY=
|
||||
166
examples/news-chatbot/server/news_bot.py
Normal file
166
examples/news-chatbot/server/news_bot.py
Normal file
@@ -0,0 +1,166 @@
|
||||
#
|
||||
# Copyright (c) 2024, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import EndFrame, Frame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIProcessor
|
||||
from pipecat.services.cartesia import CartesiaTTSService
|
||||
from pipecat.services.deepgram import DeepgramSTTService
|
||||
from pipecat.services.google import GoogleLLMService, LLMSearchResponseFrame
|
||||
from pipecat.transports.services.daily import DailyParams, DailyTransport
|
||||
from pipecat.utils.text.markdown_text_filter import MarkdownTextFilter
|
||||
|
||||
sys.path.append(str(Path(__file__).parent.parent))
|
||||
from runner import configure
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
logger.remove(0)
|
||||
logger.add(sys.stderr, level="DEBUG")
|
||||
|
||||
|
||||
# Function handlers for the LLM
|
||||
# https://ai.google.dev/gemini-api/docs/grounding?lang=python#dynamic-retrieval
|
||||
# Some queries are likely to benefit more from Grounding with Google Search than others.
|
||||
# The dynamic retrieval feature gives you additional control over when to use Grounding with Google Search.
|
||||
# If the dynamic retrieval mode is unspecified, Grounding with Google Search is always triggered.
|
||||
# If the mode is set to dynamic, the model decides when to use grounding based on a threshold that you can configure.
|
||||
# The threshold is a floating-point value in the range [0,1] and defaults to 0.3.
|
||||
# If the threshold value is 0, the response is always grounded with Google Search; if it's 1, it never is.
|
||||
search_tool = {
|
||||
"google_search_retrieval": {
|
||||
"dynamic_retrieval_config": {
|
||||
"mode": "MODE_DYNAMIC",
|
||||
"dynamic_threshold": 0,
|
||||
} # always grounding
|
||||
}
|
||||
}
|
||||
tools = [search_tool]
|
||||
|
||||
system_instruction = """
|
||||
You are an expert at providing the most recent news from any place. Your responses will be converted to audio, so ensure they are formatted in plain text without special characters (e.g., *, _, -) or overly complex formatting.
|
||||
|
||||
Guidelines:
|
||||
- Use the Google search API to retrieve the current date and provide the latest news.
|
||||
- Always deliver accurate and concise responses.
|
||||
- Ensure all responses are clear, using plain text only. Avoid any special characters or symbols.
|
||||
|
||||
Start every interaction by asking how you can assist the user.
|
||||
"""
|
||||
|
||||
|
||||
class LLMSearchLoggerProcessor(FrameProcessor):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, LLMSearchResponseFrame):
|
||||
print(f"LLMSearchLoggerProcessor: {frame}")
|
||||
|
||||
await self.push_frame(frame)
|
||||
|
||||
|
||||
async def main():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
(room_url, token) = await configure(session)
|
||||
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
"Latest news!",
|
||||
DailyParams(
|
||||
audio_out_enabled=True,
|
||||
vad_enabled=True,
|
||||
vad_analyzer=SileroVADAnalyzer(),
|
||||
vad_audio_passthrough=True,
|
||||
),
|
||||
)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
|
||||
text_filter=MarkdownTextFilter(),
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
system_instruction=system_instruction,
|
||||
tools=tools,
|
||||
)
|
||||
|
||||
context = OpenAILLMContext(
|
||||
[
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Start by greeting the user warmly, introducing yourself, and mentioning the current day. Be friendly and engaging to set a positive tone for the interaction.",
|
||||
}
|
||||
],
|
||||
)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
|
||||
llm_search_logger = LLMSearchLoggerProcessor()
|
||||
|
||||
#
|
||||
# RTVI events for Pipecat client UI
|
||||
#
|
||||
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
rtvi,
|
||||
context_aggregator.user(),
|
||||
llm,
|
||||
llm_search_logger,
|
||||
tts,
|
||||
transport.output(),
|
||||
context_aggregator.assistant(),
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
PipelineParams(
|
||||
allow_interruptions=True,
|
||||
observers=[rtvi.observer()],
|
||||
),
|
||||
)
|
||||
|
||||
@rtvi.event_handler("on_client_ready")
|
||||
async def on_client_ready(rtvi):
|
||||
await rtvi.set_bot_ready()
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
print(f"Participant left: {participant}")
|
||||
await task.queue_frame(EndFrame())
|
||||
|
||||
runner = PipelineRunner()
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
4
examples/news-chatbot/server/requirements.txt
Normal file
4
examples/news-chatbot/server/requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
python-dotenv
|
||||
fastapi[all]
|
||||
uvicorn
|
||||
pipecat-ai[daily,google,deepgram,cartesia,silero,openai]
|
||||
63
examples/news-chatbot/server/runner.py
Normal file
63
examples/news-chatbot/server/runner.py
Normal file
@@ -0,0 +1,63 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
|
||||
|
||||
|
||||
async def configure(aiohttp_session: aiohttp.ClientSession):
|
||||
(url, token, _) = await configure_with_args(aiohttp_session)
|
||||
return (url, token)
|
||||
|
||||
|
||||
async def configure_with_args(
|
||||
aiohttp_session: aiohttp.ClientSession, parser: argparse.ArgumentParser | None = None
|
||||
):
|
||||
if not parser:
|
||||
parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
|
||||
parser.add_argument(
|
||||
"-u", "--url", type=str, required=False, help="URL of the Daily room to join"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--apikey",
|
||||
type=str,
|
||||
required=False,
|
||||
help="Daily API Key (needed to create an owner token for the room)",
|
||||
)
|
||||
|
||||
args, unknown = parser.parse_known_args()
|
||||
|
||||
url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
|
||||
key = args.apikey or os.getenv("DAILY_API_KEY")
|
||||
|
||||
if not url:
|
||||
raise Exception(
|
||||
"No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
|
||||
)
|
||||
|
||||
if not key:
|
||||
raise Exception(
|
||||
"No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
|
||||
)
|
||||
|
||||
daily_rest_helper = DailyRESTHelper(
|
||||
daily_api_key=key,
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
|
||||
# Create a meeting token for the given room with an expiration 1 hour in
|
||||
# the future.
|
||||
expiry_time: float = 60 * 60
|
||||
|
||||
token = await daily_rest_helper.get_token(url, expiry_time)
|
||||
|
||||
return (url, token, args)
|
||||
147
examples/news-chatbot/server/server.py
Normal file
147
examples/news-chatbot/server/server.py
Normal file
@@ -0,0 +1,147 @@
|
||||
#
|
||||
# Copyright (c) 2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any, Dict
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
|
||||
|
||||
# Load environment variables from .env file
|
||||
load_dotenv(override=True)
|
||||
|
||||
# Dictionary to track bot processes: {pid: (process, room_url)}
|
||||
bot_procs = {}
|
||||
|
||||
# Store Daily API helpers
|
||||
daily_helpers = {}
|
||||
|
||||
|
||||
def cleanup():
|
||||
"""Cleanup function to terminate all bot processes.
|
||||
|
||||
Called during server shutdown.
|
||||
"""
|
||||
for entry in bot_procs.values():
|
||||
proc = entry[0]
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""FastAPI lifespan manager that handles startup and shutdown tasks.
|
||||
|
||||
- Creates aiohttp session
|
||||
- Initializes Daily API helper
|
||||
- Cleans up resources on shutdown
|
||||
"""
|
||||
aiohttp_session = aiohttp.ClientSession()
|
||||
daily_helpers["rest"] = DailyRESTHelper(
|
||||
daily_api_key=os.getenv("DAILY_API_KEY", ""),
|
||||
daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
|
||||
aiohttp_session=aiohttp_session,
|
||||
)
|
||||
yield
|
||||
await aiohttp_session.close()
|
||||
cleanup()
|
||||
|
||||
|
||||
# Initialize FastAPI app with lifespan manager
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
# Configure CORS to allow requests from any origin
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
async def create_room_and_token() -> tuple[str, str]:
|
||||
"""Helper function to create a Daily room and generate an access token.
|
||||
|
||||
Returns:
|
||||
tuple[str, str]: A tuple containing (room_url, token)
|
||||
|
||||
Raises:
|
||||
HTTPException: If room creation or token generation fails
|
||||
"""
|
||||
room = await daily_helpers["rest"].create_room(DailyRoomParams())
|
||||
if not room.url:
|
||||
raise HTTPException(status_code=500, detail="Failed to create room")
|
||||
|
||||
token = await daily_helpers["rest"].get_token(room.url)
|
||||
if not token:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
|
||||
|
||||
return room.url, token
|
||||
|
||||
|
||||
@app.post("/connect")
|
||||
async def bot_connect(request: Request) -> Dict[Any, Any]:
|
||||
"""Connect endpoint that creates a room and returns connection credentials.
|
||||
|
||||
This endpoint is called by client to establish a connection.
|
||||
|
||||
Returns:
|
||||
Dict[Any, Any]: Authentication bundle containing room_url and token
|
||||
|
||||
Raises:
|
||||
HTTPException: If room creation, token generation, or bot startup fails
|
||||
"""
|
||||
print("Creating room for RTVI connection")
|
||||
room_url, token = await create_room_and_token()
|
||||
print(f"Room URL: {room_url}")
|
||||
|
||||
# Start the bot process
|
||||
try:
|
||||
bot_file = "news_bot"
|
||||
proc = subprocess.Popen(
|
||||
[f"python3 -m {bot_file} -u {room_url} -t {token}"],
|
||||
shell=True,
|
||||
bufsize=1,
|
||||
cwd=os.path.dirname(os.path.abspath(__file__)),
|
||||
)
|
||||
bot_procs[proc.pid] = (proc, room_url)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
|
||||
|
||||
# Return the authentication bundle in format expected by DailyTransport
|
||||
return {"room_url": room_url, "token": token}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
# Parse command line arguments for server configuration
|
||||
default_host = os.getenv("HOST", "0.0.0.0")
|
||||
default_port = int(os.getenv("FAST_API_PORT", "7860"))
|
||||
|
||||
parser = argparse.ArgumentParser(description="Daily Travel Companion FastAPI server")
|
||||
parser.add_argument("--host", type=str, default=default_host, help="Host address")
|
||||
parser.add_argument("--port", type=int, default=default_port, help="Port number")
|
||||
parser.add_argument("--reload", action="store_true", help="Reload code on change")
|
||||
|
||||
config = parser.parse_args()
|
||||
|
||||
# Start the FastAPI server
|
||||
uvicorn.run(
|
||||
"server:app",
|
||||
host=config.host,
|
||||
port=config.port,
|
||||
reload=config.reload,
|
||||
)
|
||||
149
examples/phone-chatbot/README.md
Normal file
149
examples/phone-chatbot/README.md
Normal file
@@ -0,0 +1,149 @@
|
||||
<div align="center">
|
||||
<img alt="pipecat" width="300px" height="auto" src="image.png">
|
||||
</div>
|
||||
|
||||
# Phone Chatbot
|
||||
|
||||
Example project that demonstrates how to add phone funtionality to your Pipecat bots. We include examples for Daily (`bot_daily.py`) dial-in and dial-out, and Twilio (`bot_twilio.py`) dial-in, depending on who you want to use as a phone vendor.
|
||||
|
||||
- 🔁 Transport: Daily WebRTC
|
||||
- 💬 Speech-to-Text: Deepgram via Daily transport
|
||||
- 🤖 LLM: GPT4-o / OpenAI
|
||||
- 🔉 Text-to-Speech: ElevenLabs
|
||||
|
||||
#### Should I use Daily or Twilio as a vendor?
|
||||
|
||||
If you're starting from scratch, using Daily to provision phone numbers alongside Daily as a transport offers some convenience (such as automatic call forwarding.)
|
||||
|
||||
If you already have Twilio numbers and workflows that you want to connect to your Pipecat bots, there is some additional configuration required (you'll need to create a `on_dialin_ready` and use the Twilio client to trigger the forward.)
|
||||
|
||||
You can read more about this, as well as see respective walkthroughs in our docs.
|
||||
|
||||
## Setup
|
||||
|
||||
1. Create and activate a virtual environment:
|
||||
```shell
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate # On Windows: venv\Scripts\activate
|
||||
```
|
||||
2. Install requirements:
|
||||
```shell
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
3. Copy env.example to .env and configure:
|
||||
```shell
|
||||
cp env.example .env
|
||||
```
|
||||
4. Install [ngrok](https://ngrok.com/) so your local server can receive requests from Daily's servers.
|
||||
|
||||
## Using Daily numbers
|
||||
|
||||
### Running the example
|
||||
|
||||
To run either the dial-in or dial-out example, follow these steps to get started:
|
||||
|
||||
1. Run `bot_runner.py` to handle incoming HTTP requests:
|
||||
|
||||
```shell
|
||||
python bot_runner.py --host localhost
|
||||
```
|
||||
|
||||
2. Start ngrok running in a terminal window:
|
||||
|
||||
```shell
|
||||
ngrok http --domain yourdomain.ngrok.app 8000
|
||||
```
|
||||
|
||||
3. In a different terminal window, run the Daily bot file:
|
||||
```shell
|
||||
python bot_daily.py
|
||||
```
|
||||
|
||||
### Dial-in
|
||||
|
||||
To dial-in to the bot, you will need to enable dial-in for your Daily domain. Follow [this guide](https://docs.daily.co/guides/products/dial-in-dial-out/dialin-pinless#provisioning-sip-interconnect-and-pinless-dialin-workflow) to set up your domain.
|
||||
|
||||
Note: For the `room_creation_api` property, point at your ngrok hostname: `"room_creation_api": "https://yourdomain.ngrok.app/daily_start_bot"`.
|
||||
|
||||
Once your domain is configured, receiving a phone call at a number associated with your Daily account will result in a POST to the `/daily_start_bot` endpoint, which will start a bot session.
|
||||
|
||||
### Dial-out
|
||||
|
||||
For the bot to dial out to a number, make a POST request to `/daily_start_bot` and include the dial-out phone number in the body of the request as `dialoutNumber`.
|
||||
|
||||
For example:
|
||||
|
||||
```shell
|
||||
url -X "POST" "http://localhost:7860/daily_start_bot" \
|
||||
-H 'Content-Type: application/json; charset=utf-8' \
|
||||
-d $'{
|
||||
"dialoutNumber": "+12125551234"
|
||||
}'
|
||||
```
|
||||
|
||||
### More information
|
||||
|
||||
For more configuration options, please consult [Daily's API documentation](https://docs.daily.co).
|
||||
|
||||
## Using Twilio numbers
|
||||
|
||||
### Running the example
|
||||
|
||||
Follow these steps to get started:
|
||||
|
||||
1. Run `bot_runner.py` to handle incoming HTTP requests:
|
||||
|
||||
```shell
|
||||
python bot_runner.py --host localhost
|
||||
```
|
||||
|
||||
2. Start ngrok running in a terminal window:
|
||||
|
||||
```shell
|
||||
ngrok http --domain yourdomain.ngrok.app 8000
|
||||
```
|
||||
|
||||
3. In a different terminal window, run the Daily bot file:
|
||||
```shell
|
||||
python bot_twilio.py
|
||||
```
|
||||
|
||||
As above, but target the following URL:
|
||||
|
||||
`POST /twilio_start_bot`
|
||||
|
||||
For more configuration options, please consult Twilio's API documentation.
|
||||
|
||||
## Deployment example
|
||||
|
||||
A Dockerfile is included in this demo for convenience. Here is an example of how to build and deploy your bot to [fly.io](https://fly.io).
|
||||
|
||||
_Please note: This demo spawns agents as subprocesses for convenience / demonstration purposes. You would likely not want to do this in production as it would limit concurrency to available system resources. For more information on how to deploy your bots using VMs, refer to the Pipecat documentation._
|
||||
|
||||
### Build the docker image
|
||||
|
||||
`docker build -t tag:project .`
|
||||
|
||||
### Launch the fly project
|
||||
|
||||
`mv fly.example.toml fly.toml`
|
||||
|
||||
`fly launch` (using the included fly.toml)
|
||||
|
||||
### Setup your secrets on Fly
|
||||
|
||||
Set the necessary secrets (found in `env.example`)
|
||||
|
||||
`fly secrets set DAILY_API_KEY=... OPENAI_API_KEY=... ELEVENLABS_API_KEY=... ELEVENLABS_VOICE_ID=...`
|
||||
|
||||
If you're using Twilio as a number vendor:
|
||||
|
||||
`fly secrets set TWILIO_ACCOUNT_SID=... TWILIO_AUTH_TOKEN=...`
|
||||
|
||||
### Deploy!
|
||||
|
||||
`fly deploy`
|
||||
|
||||
## Need to do something more advanced?
|
||||
|
||||
This demo covers the basics of bot telephony. If you want to know more about working with PSTN / SIP, please ping us on [Discord](https://discord.gg/pipecat)!
|
||||
@@ -25,12 +25,11 @@ daily_api_key = os.getenv("DAILY_API_KEY", "")
|
||||
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
|
||||
|
||||
|
||||
async def main(room_url: str, token: str, callId: str, callDomain: str):
|
||||
# diallin_settings are only needed if Daily's SIP URI is used
|
||||
async def main(room_url: str, token: str, callId: str, callDomain: str, dialout_number: str | None):
|
||||
# dialin_settings are only needed if Daily's SIP URI is used
|
||||
# If you are handling this via Twilio, Telnyx, set this to None
|
||||
# and handle call-forwarding when on_dialin_ready fires.
|
||||
diallin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
|
||||
|
||||
dialin_settings = DailyDialinSettings(call_id=callId, call_domain=callDomain)
|
||||
transport = DailyTransport(
|
||||
room_url,
|
||||
token,
|
||||
@@ -38,7 +37,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
|
||||
DailyParams(
|
||||
api_url=daily_api_url,
|
||||
api_key=daily_api_key,
|
||||
dialin_settings=diallin_settings,
|
||||
dialin_settings=dialin_settings,
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
camera_out_enabled=False,
|
||||
@@ -58,7 +57,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! Who dares dial me at this hour?!'.",
|
||||
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying 'Oh, hello! I'm a friendly chatbot. How can I help you?'.",
|
||||
},
|
||||
]
|
||||
|
||||
@@ -78,10 +77,41 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
|
||||
|
||||
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
if dialout_number:
|
||||
logger.debug("dialout number detected; doing dialout")
|
||||
|
||||
# Configure some handlers for dialing out
|
||||
@transport.event_handler("on_joined")
|
||||
async def on_joined(transport, data):
|
||||
logger.debug(f"Joined; starting dialout to: {dialout_number}")
|
||||
await transport.start_dialout({"phoneNumber": dialout_number})
|
||||
|
||||
@transport.event_handler("on_dialout_connected")
|
||||
async def on_dialout_connected(transport, data):
|
||||
logger.debug(f"Dial-out connected: {data}")
|
||||
|
||||
@transport.event_handler("on_dialout_answered")
|
||||
async def on_dialout_answered(transport, data):
|
||||
logger.debug(f"Dial-out answered: {data}")
|
||||
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# unlike the dialin case, for the dialout case, the caller will speak first. Presumably
|
||||
# they will answer the phone and say "Hello?" Since we've captured their transcript,
|
||||
# That will put a frame into the pipeline and prompt an LLM completion, which is how the
|
||||
# bot will then greet the user.
|
||||
|
||||
else:
|
||||
logger.debug("no dialout number; assuming dialin")
|
||||
|
||||
# Different handlers for dialin
|
||||
@transport.event_handler("on_first_participant_joined")
|
||||
async def on_first_participant_joined(transport, participant):
|
||||
await transport.capture_participant_transcription(participant["id"])
|
||||
# For the dialin case, we want the bot to answer the phone and greet the user. We
|
||||
# can prompt the bot to speak by putting the context into the pipeline.
|
||||
await task.queue_frames([context_aggregator.user().get_context_frame()])
|
||||
|
||||
@transport.event_handler("on_participant_left")
|
||||
async def on_participant_left(transport, participant, reason):
|
||||
@@ -98,6 +128,7 @@ if __name__ == "__main__":
|
||||
parser.add_argument("-t", type=str, help="Token")
|
||||
parser.add_argument("-i", type=str, help="Call ID")
|
||||
parser.add_argument("-d", type=str, help="Call Domain")
|
||||
parser.add_argument("-o", type=str, help="Dialout number", default=None)
|
||||
config = parser.parse_args()
|
||||
|
||||
asyncio.run(main(config.u, config.t, config.i, config.d))
|
||||
asyncio.run(main(config.u, config.t, config.i, config.d, config.o))
|
||||
@@ -73,24 +73,27 @@ action using the Twilio Client library.
|
||||
"""
|
||||
|
||||
|
||||
async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
|
||||
async def _create_daily_room(room_url, callId, callDomain=None, dialoutNumber=None, vendor="daily"):
|
||||
if not room_url:
|
||||
params = DailyRoomParams(
|
||||
properties=DailyRoomProperties(
|
||||
# Note: these are the default values, except for the display name
|
||||
sip=DailyRoomSipParams(
|
||||
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=1
|
||||
)
|
||||
# Create base properties with SIP settings
|
||||
properties = DailyRoomProperties(
|
||||
sip=DailyRoomSipParams(
|
||||
display_name="dialin-user", video=False, sip_mode="dial-in", num_endpoints=1
|
||||
)
|
||||
)
|
||||
|
||||
# Only enable dialout if dialoutNumber is provided
|
||||
if dialoutNumber:
|
||||
properties.enable_dialout = True
|
||||
|
||||
params = DailyRoomParams(properties=properties)
|
||||
|
||||
print(f"Creating new room...")
|
||||
room: DailyRoomObject = await daily_helpers["rest"].create_room(params=params)
|
||||
|
||||
else:
|
||||
# Check passed room URL exist (we assume that it already has a sip set up!)
|
||||
try:
|
||||
print(f"Joining existing room: {room_url}")
|
||||
room: DailyRoomObject = await daily_helpers["rest"].get_room_from_url(room_url)
|
||||
except Exception:
|
||||
raise HTTPException(status_code=500, detail=f"Room not found: {room_url}")
|
||||
@@ -107,6 +110,8 @@ async def _create_daily_room(room_url, callId, callDomain=None, vendor="daily"):
|
||||
# Note: this is mostly for demonstration purposes (refer to 'deployment' in docs)
|
||||
if vendor == "daily":
|
||||
bot_proc = f"python3 -m bot_daily -u {room.url} -t {token} -i {callId} -d {callDomain}"
|
||||
if dialoutNumber:
|
||||
bot_proc += f" -o {dialoutNumber}"
|
||||
else:
|
||||
bot_proc = f"python3 -m bot_twilio -u {room.url} -t {token} -i {callId} -s {room.config.sip_endpoint}"
|
||||
|
||||
@@ -179,11 +184,15 @@ async def daily_start_bot(request: Request) -> JSONResponse:
|
||||
return JSONResponse({"test": True})
|
||||
callId = data.get("callId", None)
|
||||
callDomain = data.get("callDomain", None)
|
||||
dialoutNumber = data.get("dialoutNumber", None)
|
||||
except Exception:
|
||||
raise HTTPException(status_code=500, detail="Missing properties 'callId' or 'callDomain'")
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Missing properties 'callId', 'callDomain', or 'dialoutNumber'"
|
||||
)
|
||||
|
||||
print(f"CallId: {callId}, CallDomain: {callDomain}")
|
||||
room: DailyRoomObject = await _create_daily_room(room_url, callId, callDomain, "daily")
|
||||
room: DailyRoomObject = await _create_daily_room(
|
||||
room_url, callId, callDomain, dialoutNumber, "daily"
|
||||
)
|
||||
|
||||
# Grab a token for the user to join with
|
||||
return JSONResponse({"room_url": room.url, "sipUri": room.config.sip_endpoint})
|
||||
|
Before Width: | Height: | Size: 19 KiB After Width: | Height: | Size: 19 KiB |
@@ -159,5 +159,5 @@ class SileroVADAnalyzer(VADAnalyzer):
|
||||
return new_confidence
|
||||
except Exception as e:
|
||||
# This comes from an empty audio array
|
||||
logger.exception(f"Error analyzing audio with Silero VAD: {e}")
|
||||
logger.error(f"Error analyzing audio with Silero VAD: {e}")
|
||||
return 0
|
||||
|
||||
@@ -11,6 +11,18 @@ from pipecat.frames.frames import Frame
|
||||
|
||||
|
||||
class BaseTask(ABC):
|
||||
@property
|
||||
@abstractmethod
|
||||
def id(self) -> int:
|
||||
"""Returns the unique indetifier for this task."""
|
||||
pass
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def name(self) -> str:
|
||||
"""Returns the name of this task."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def has_finished(self) -> bool:
|
||||
"""Indicates whether the tasks has finished. That is, all processors
|
||||
|
||||
@@ -23,7 +23,7 @@ from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
|
||||
|
||||
class Source(FrameProcessor):
|
||||
class ParallelPipelineSource(FrameProcessor):
|
||||
def __init__(
|
||||
self,
|
||||
upstream_queue: asyncio.Queue,
|
||||
@@ -46,7 +46,7 @@ class Source(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class Sink(FrameProcessor):
|
||||
class ParallelPipelineSink(FrameProcessor):
|
||||
def __init__(
|
||||
self,
|
||||
downstream_queue: asyncio.Queue,
|
||||
@@ -92,8 +92,8 @@ class ParallelPipeline(BasePipeline):
|
||||
raise TypeError(f"ParallelPipeline argument {processors} is not a list")
|
||||
|
||||
# We will add a source before the pipeline and a sink after.
|
||||
source = Source(self._up_queue, self._parallel_push_frame)
|
||||
sink = Sink(self._down_queue, self._parallel_push_frame)
|
||||
source = ParallelPipelineSource(self._up_queue, self._parallel_push_frame)
|
||||
sink = ParallelPipelineSink(self._down_queue, self._parallel_push_frame)
|
||||
self._sources.append(source)
|
||||
self._sinks.append(sink)
|
||||
|
||||
@@ -117,6 +117,7 @@ class ParallelPipeline(BasePipeline):
|
||||
#
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
await asyncio.gather(*[s.cleanup() for s in self._sources])
|
||||
await asyncio.gather(*[p.cleanup() for p in self._pipelines])
|
||||
await asyncio.gather(*[s.cleanup() for s in self._sinks])
|
||||
@@ -150,22 +151,18 @@ class ParallelPipeline(BasePipeline):
|
||||
|
||||
async def _stop(self):
|
||||
# The up task doesn't receive an EndFrame, so we just cancel it.
|
||||
self._up_task.cancel()
|
||||
await self._up_task
|
||||
# The down tasks waits for the last EndFrame send by the internal
|
||||
await self.cancel_task(self._up_task)
|
||||
# The down tasks waits for the last EndFrame sent by the internal
|
||||
# pipelines.
|
||||
await self._down_task
|
||||
|
||||
async def _cancel(self):
|
||||
self._up_task.cancel()
|
||||
await self._up_task
|
||||
self._down_task.cancel()
|
||||
await self._down_task
|
||||
await self.cancel_task(self._up_task)
|
||||
await self.cancel_task(self._down_task)
|
||||
|
||||
async def _create_tasks(self):
|
||||
loop = self.get_event_loop()
|
||||
self._up_task = loop.create_task(self._process_up_queue())
|
||||
self._down_task = loop.create_task(self._process_down_queue())
|
||||
self._up_task = self.create_task(self._process_up_queue())
|
||||
self._down_task = self.create_task(self._process_down_queue())
|
||||
|
||||
async def _drain_queues(self):
|
||||
while not self._up_queue.empty:
|
||||
@@ -185,32 +182,26 @@ class ParallelPipeline(BasePipeline):
|
||||
|
||||
async def _process_up_queue(self):
|
||||
while True:
|
||||
try:
|
||||
frame = await self._up_queue.get()
|
||||
await self._parallel_push_frame(frame, FrameDirection.UPSTREAM)
|
||||
self._up_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
frame = await self._up_queue.get()
|
||||
await self._parallel_push_frame(frame, FrameDirection.UPSTREAM)
|
||||
self._up_queue.task_done()
|
||||
|
||||
async def _process_down_queue(self):
|
||||
running = True
|
||||
while running:
|
||||
try:
|
||||
frame = await self._down_queue.get()
|
||||
frame = await self._down_queue.get()
|
||||
|
||||
endframe_counter = self._endframe_counter.get(frame.id, 0)
|
||||
endframe_counter = self._endframe_counter.get(frame.id, 0)
|
||||
|
||||
# If we have a counter, decrement it.
|
||||
if endframe_counter > 0:
|
||||
self._endframe_counter[frame.id] -= 1
|
||||
endframe_counter = self._endframe_counter[frame.id]
|
||||
# If we have a counter, decrement it.
|
||||
if endframe_counter > 0:
|
||||
self._endframe_counter[frame.id] -= 1
|
||||
endframe_counter = self._endframe_counter[frame.id]
|
||||
|
||||
# If we don't have a counter or we reached 0, push the frame.
|
||||
if endframe_counter == 0:
|
||||
await self._parallel_push_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
# If we don't have a counter or we reached 0, push the frame.
|
||||
if endframe_counter == 0:
|
||||
await self._parallel_push_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
|
||||
running = not (endframe_counter == 0 and isinstance(frame, EndFrame))
|
||||
running = not (endframe_counter == 0 and isinstance(frame, EndFrame))
|
||||
|
||||
self._down_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
self._down_queue.task_done()
|
||||
|
||||
@@ -71,6 +71,7 @@ class Pipeline(BasePipeline):
|
||||
#
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
await self._cleanup_processors()
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
|
||||
@@ -10,6 +10,7 @@ import signal
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.utils.asyncio import current_tasks
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
|
||||
|
||||
@@ -19,6 +20,7 @@ class PipelineRunner:
|
||||
self.name: str = name or f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
|
||||
self._tasks = {}
|
||||
self._sig_task = None
|
||||
|
||||
if handle_sigint:
|
||||
self._setup_sigint()
|
||||
@@ -28,6 +30,11 @@ class PipelineRunner:
|
||||
self._tasks[task.name] = task
|
||||
await task.run()
|
||||
del self._tasks[task.name]
|
||||
# If we are cancelling through a signal, make sure we wait for it so
|
||||
# everything gets cleaned up nicely.
|
||||
if self._sig_task:
|
||||
await self._sig_task
|
||||
self._print_dangling_tasks()
|
||||
logger.debug(f"Runner {self} finished running {task}")
|
||||
|
||||
async def stop_when_done(self):
|
||||
@@ -40,16 +47,21 @@ class PipelineRunner:
|
||||
|
||||
def _setup_sigint(self):
|
||||
loop = asyncio.get_running_loop()
|
||||
loop.add_signal_handler(
|
||||
signal.SIGINT, lambda *args: asyncio.create_task(self._sig_handler())
|
||||
)
|
||||
loop.add_signal_handler(
|
||||
signal.SIGTERM, lambda *args: asyncio.create_task(self._sig_handler())
|
||||
)
|
||||
loop.add_signal_handler(signal.SIGINT, lambda *args: self._sig_handler())
|
||||
loop.add_signal_handler(signal.SIGTERM, lambda *args: self._sig_handler())
|
||||
|
||||
async def _sig_handler(self):
|
||||
def _sig_handler(self):
|
||||
if not self._sig_task:
|
||||
self._sig_task = asyncio.create_task(self._sig_cancel())
|
||||
|
||||
async def _sig_cancel(self):
|
||||
logger.warning(f"Interruption detected. Canceling runner {self}")
|
||||
await self.cancel()
|
||||
|
||||
def _print_dangling_tasks(self):
|
||||
tasks = [t.get_name() for t in current_tasks()]
|
||||
if tasks:
|
||||
logger.warning(f"Dangling tasks detected: {tasks}")
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
@@ -24,7 +24,7 @@ class SyncFrame(ControlFrame):
|
||||
pass
|
||||
|
||||
|
||||
class Source(FrameProcessor):
|
||||
class SyncParallelPipelineSource(FrameProcessor):
|
||||
def __init__(self, upstream_queue: asyncio.Queue):
|
||||
super().__init__()
|
||||
self._up_queue = upstream_queue
|
||||
@@ -39,7 +39,7 @@ class Source(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class Sink(FrameProcessor):
|
||||
class SyncParallelPipelineSink(FrameProcessor):
|
||||
def __init__(self, downstream_queue: asyncio.Queue):
|
||||
super().__init__()
|
||||
self._down_queue = downstream_queue
|
||||
@@ -76,8 +76,8 @@ class SyncParallelPipeline(BasePipeline):
|
||||
# We add a source at the beginning of the pipeline and a sink at the end.
|
||||
up_queue = asyncio.Queue()
|
||||
down_queue = asyncio.Queue()
|
||||
source = Source(up_queue)
|
||||
sink = Sink(down_queue)
|
||||
source = SyncParallelPipelineSource(up_queue)
|
||||
sink = SyncParallelPipelineSink(down_queue)
|
||||
processors: List[FrameProcessor] = [source] + processors + [sink]
|
||||
|
||||
# Keep track of sources and sinks. We also keep the output queue of
|
||||
@@ -101,6 +101,10 @@ class SyncParallelPipeline(BasePipeline):
|
||||
# Frame processor
|
||||
#
|
||||
|
||||
async def cleanup(self):
|
||||
await super().cleanup()
|
||||
await asyncio.gather(*[p.cleanup() for p in self._pipelines])
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ from pipecat.pipeline.base_pipeline import BasePipeline
|
||||
from pipecat.pipeline.base_task import BaseTask
|
||||
from pipecat.pipeline.task_observer import TaskObserver
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.utils.asyncio import cancel_task, create_task, wait_for_task
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
|
||||
HEARTBEAT_SECONDS = 1.0
|
||||
@@ -49,7 +50,7 @@ class PipelineParams(BaseModel):
|
||||
heartbeats_period_secs: float = HEARTBEAT_SECONDS
|
||||
|
||||
|
||||
class Source(FrameProcessor):
|
||||
class PipelineTaskSource(FrameProcessor):
|
||||
"""This is the source processor that is linked at the beginning of the
|
||||
pipeline given to the pipeline task. It allows us to easily push frames
|
||||
downstream to the pipeline and also receive upstream frames coming from the
|
||||
@@ -57,8 +58,8 @@ class Source(FrameProcessor):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, up_queue: asyncio.Queue):
|
||||
super().__init__()
|
||||
def __init__(self, up_queue: asyncio.Queue, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._up_queue = up_queue
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
@@ -71,15 +72,15 @@ class Source(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
class Sink(FrameProcessor):
|
||||
class PipelineTaskSink(FrameProcessor):
|
||||
"""This is the sink processor that is linked at the end of the pipeline
|
||||
given to the pipeline task. It allows us to receive downstream frames and
|
||||
act on them, for example, waiting to receive an EndFrame.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, down_queue: asyncio.Queue):
|
||||
super().__init__()
|
||||
def __init__(self, down_queue: asyncio.Queue, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._down_queue = down_queue
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
@@ -94,8 +95,8 @@ class PipelineTask(BaseTask):
|
||||
params: PipelineParams = PipelineParams(),
|
||||
clock: BaseClock = SystemClock(),
|
||||
):
|
||||
self.id: int = obj_id()
|
||||
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self._id: int = obj_id()
|
||||
self._name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
|
||||
self._pipeline = pipeline
|
||||
self._clock = clock
|
||||
@@ -115,14 +116,24 @@ class PipelineTask(BaseTask):
|
||||
# down queue.
|
||||
self._endframe_event = asyncio.Event()
|
||||
|
||||
self._source = Source(self._up_queue)
|
||||
self._source = PipelineTaskSource(self._up_queue)
|
||||
self._source.link(pipeline)
|
||||
|
||||
self._sink = Sink(self._down_queue)
|
||||
self._sink = PipelineTaskSink(self._down_queue)
|
||||
pipeline.link(self._sink)
|
||||
|
||||
self._observer = TaskObserver(params.observers)
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
"""Returns the unique indetifier for this task."""
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""Returns the name of this task."""
|
||||
return self._name
|
||||
|
||||
def has_finished(self) -> bool:
|
||||
"""Indicates whether the tasks has finished. That is, all processors
|
||||
have stopped.
|
||||
@@ -147,14 +158,24 @@ class PipelineTask(BaseTask):
|
||||
# out-of-band from the main streaming task which is what we want since
|
||||
# we want to cancel right away.
|
||||
await self._source.push_frame(CancelFrame())
|
||||
await self._cancel_tasks(True)
|
||||
# Only cancel the push task. Everything else will be cancelled in run().
|
||||
await cancel_task(self._process_push_task)
|
||||
await self._cleanup()
|
||||
|
||||
async def run(self):
|
||||
"""
|
||||
Starts running the given pipeline.
|
||||
"""
|
||||
tasks = self._create_tasks()
|
||||
await asyncio.gather(*tasks)
|
||||
try:
|
||||
push_task = self._create_tasks()
|
||||
await wait_for_task(push_task)
|
||||
except asyncio.CancelledError:
|
||||
# We are awaiting on the push task and it might be cancelled
|
||||
# (e.g. Ctrl-C). This means we will get a CancelledError here as
|
||||
# well, because you get a CancelledError in every place you are
|
||||
# awaiting a task.
|
||||
pass
|
||||
await self._cancel_tasks()
|
||||
self._finished = True
|
||||
|
||||
async def queue_frame(self, frame: Frame):
|
||||
@@ -175,41 +196,41 @@ class PipelineTask(BaseTask):
|
||||
await self.queue_frame(frame)
|
||||
|
||||
def _create_tasks(self):
|
||||
tasks = []
|
||||
self._process_up_task = asyncio.create_task(self._process_up_queue())
|
||||
self._process_down_task = asyncio.create_task(self._process_down_queue())
|
||||
self._process_push_task = asyncio.create_task(self._process_push_queue())
|
||||
loop = asyncio.get_running_loop()
|
||||
self._process_up_task = create_task(
|
||||
loop, self._process_up_queue(), f"{self}::_process_up_queue"
|
||||
)
|
||||
self._process_down_task = create_task(
|
||||
loop, self._process_down_queue(), f"{self}::_process_down_queue"
|
||||
)
|
||||
self._process_push_task = create_task(
|
||||
loop, self._process_push_queue(), f"{self}::_process_push_queue"
|
||||
)
|
||||
|
||||
tasks = [self._process_up_task, self._process_down_task, self._process_push_task]
|
||||
|
||||
return tasks
|
||||
return self._process_push_task
|
||||
|
||||
def _maybe_start_heartbeat_tasks(self):
|
||||
if self._params.enable_heartbeats:
|
||||
self._heartbeat_push_task = asyncio.create_task(self._heartbeat_push_handler())
|
||||
self._heartbeat_monitor_task = asyncio.create_task(self._heartbeat_monitor_handler())
|
||||
loop = asyncio.get_running_loop()
|
||||
self._heartbeat_push_task = create_task(
|
||||
loop, self._heartbeat_push_handler(), f"{self}::_heartbeat_push_handler"
|
||||
)
|
||||
self._heartbeat_monitor_task = create_task(
|
||||
loop, self._heartbeat_monitor_handler(), f"{self}::_heartbeat_monitor_handler"
|
||||
)
|
||||
|
||||
async def _cancel_tasks(self, cancel_push: bool):
|
||||
async def _cancel_tasks(self):
|
||||
await self._maybe_cancel_heartbeat_tasks()
|
||||
|
||||
if cancel_push:
|
||||
self._process_push_task.cancel()
|
||||
await self._process_push_task
|
||||
|
||||
self._process_up_task.cancel()
|
||||
await self._process_up_task
|
||||
|
||||
self._process_down_task.cancel()
|
||||
await self._process_down_task
|
||||
await cancel_task(self._process_up_task)
|
||||
await cancel_task(self._process_down_task)
|
||||
|
||||
await self._observer.stop()
|
||||
|
||||
async def _maybe_cancel_heartbeat_tasks(self):
|
||||
if self._params.enable_heartbeats:
|
||||
self._heartbeat_push_task.cancel()
|
||||
await self._heartbeat_push_task
|
||||
self._heartbeat_monitor_task.cancel()
|
||||
await self._heartbeat_monitor_task
|
||||
await cancel_task(self._heartbeat_push_task)
|
||||
await cancel_task(self._heartbeat_monitor_task)
|
||||
|
||||
def _initial_metrics_frame(self) -> MetricsFrame:
|
||||
processors = self._pipeline.processors_with_metrics()
|
||||
@@ -223,6 +244,11 @@ class PipelineTask(BaseTask):
|
||||
await self._endframe_event.wait()
|
||||
self._endframe_event.clear()
|
||||
|
||||
async def _cleanup(self):
|
||||
await self._source.cleanup()
|
||||
await self._pipeline.cleanup()
|
||||
await self._sink.cleanup()
|
||||
|
||||
async def _process_push_queue(self):
|
||||
"""This is the task that runs the pipeline for the first time by sending
|
||||
a StartFrame and by pushing any other frames queued by the user. It runs
|
||||
@@ -249,24 +275,16 @@ class PipelineTask(BaseTask):
|
||||
running = True
|
||||
should_cleanup = True
|
||||
while running:
|
||||
try:
|
||||
frame = await self._push_queue.get()
|
||||
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
if isinstance(frame, EndFrame):
|
||||
await self._wait_for_endframe()
|
||||
running = not isinstance(frame, (StopTaskFrame, EndFrame))
|
||||
should_cleanup = not isinstance(frame, StopTaskFrame)
|
||||
self._push_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
frame = await self._push_queue.get()
|
||||
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
|
||||
if isinstance(frame, EndFrame):
|
||||
await self._wait_for_endframe()
|
||||
running = not isinstance(frame, (StopTaskFrame, EndFrame))
|
||||
should_cleanup = not isinstance(frame, StopTaskFrame)
|
||||
self._push_queue.task_done()
|
||||
# Cleanup only if we need to.
|
||||
if should_cleanup:
|
||||
await self._source.cleanup()
|
||||
await self._pipeline.cleanup()
|
||||
await self._sink.cleanup()
|
||||
# Finally, cancel internal tasks. We don't cancel the push tasks because
|
||||
# that's us.
|
||||
await self._cancel_tasks(False)
|
||||
await self._cleanup()
|
||||
|
||||
async def _process_up_queue(self):
|
||||
"""This is the task that processes frames coming upstream from the
|
||||
@@ -276,26 +294,23 @@ class PipelineTask(BaseTask):
|
||||
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
frame = await self._up_queue.get()
|
||||
if isinstance(frame, EndTaskFrame):
|
||||
# Tell the task we should end nicely.
|
||||
await self.queue_frame(EndFrame())
|
||||
elif isinstance(frame, CancelTaskFrame):
|
||||
# Tell the task we should end right away.
|
||||
frame = await self._up_queue.get()
|
||||
if isinstance(frame, EndTaskFrame):
|
||||
# Tell the task we should end nicely.
|
||||
await self.queue_frame(EndFrame())
|
||||
elif isinstance(frame, CancelTaskFrame):
|
||||
# Tell the task we should end right away.
|
||||
await self.queue_frame(CancelFrame())
|
||||
elif isinstance(frame, StopTaskFrame):
|
||||
await self.queue_frame(StopTaskFrame())
|
||||
elif isinstance(frame, ErrorFrame):
|
||||
logger.error(f"Error running app: {frame}")
|
||||
if frame.fatal:
|
||||
# Cancel all tasks downstream.
|
||||
await self.queue_frame(CancelFrame())
|
||||
elif isinstance(frame, StopTaskFrame):
|
||||
# Tell the task we should stop.
|
||||
await self.queue_frame(StopTaskFrame())
|
||||
elif isinstance(frame, ErrorFrame):
|
||||
logger.error(f"Error running app: {frame}")
|
||||
if frame.fatal:
|
||||
# Cancel all tasks downstream.
|
||||
await self.queue_frame(CancelFrame())
|
||||
# Tell the task we should stop.
|
||||
await self.queue_frame(StopTaskFrame())
|
||||
self._up_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
self._up_queue.task_done()
|
||||
|
||||
async def _process_down_queue(self):
|
||||
"""This tasks process frames coming downstream from the pipeline. For
|
||||
@@ -305,29 +320,23 @@ class PipelineTask(BaseTask):
|
||||
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
frame = await self._down_queue.get()
|
||||
if isinstance(frame, EndFrame):
|
||||
self._endframe_event.set()
|
||||
elif isinstance(frame, HeartbeatFrame):
|
||||
await self._heartbeat_queue.put(frame)
|
||||
self._down_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
frame = await self._down_queue.get()
|
||||
if isinstance(frame, EndFrame):
|
||||
self._endframe_event.set()
|
||||
elif isinstance(frame, HeartbeatFrame):
|
||||
await self._heartbeat_queue.put(frame)
|
||||
self._down_queue.task_done()
|
||||
|
||||
async def _heartbeat_push_handler(self):
|
||||
"""
|
||||
This tasks pushes a heartbeat frame every heartbeat period.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
# Don't use `queue_frame()` because if an EndFrame is queued the
|
||||
# task will just stop waiting for the pipeline to finish not
|
||||
# allowing more frames to be pushed.
|
||||
await self._source.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time()))
|
||||
await asyncio.sleep(self._params.heartbeats_period_secs)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
# Don't use `queue_frame()` because if an EndFrame is queued the
|
||||
# task will just stop waiting for the pipeline to finish not
|
||||
# allowing more frames to be pushed.
|
||||
await self._source.queue_frame(HeartbeatFrame(timestamp=self._clock.get_time()))
|
||||
await asyncio.sleep(self._params.heartbeats_period_secs)
|
||||
|
||||
async def _heartbeat_monitor_handler(self):
|
||||
"""This tasks monitors heartbeat frames. If a heartbeat frame has not
|
||||
@@ -347,8 +356,6 @@ class PipelineTask(BaseTask):
|
||||
logger.warning(
|
||||
f"{self}: heartbeat frame not received for more than {wait_time} seconds"
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
@@ -12,6 +12,8 @@ from attr import dataclass
|
||||
from pipecat.frames.frames import Frame
|
||||
from pipecat.observers.base_observer import BaseObserver
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.utils.asyncio import cancel_task, create_task
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -54,13 +56,22 @@ class TaskObserver(BaseObserver):
|
||||
"""
|
||||
|
||||
def __init__(self, observers: List[BaseObserver] = []):
|
||||
self._id: int = obj_id()
|
||||
self._name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self._proxies: List[Proxy] = self._create_proxies(observers)
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._name
|
||||
|
||||
async def stop(self):
|
||||
"""Stops all proxy observer tasks."""
|
||||
for proxy in self._proxies:
|
||||
proxy.task.cancel()
|
||||
await proxy.task
|
||||
await cancel_task(proxy.task)
|
||||
|
||||
async def on_push_frame(
|
||||
self,
|
||||
@@ -79,19 +90,24 @@ class TaskObserver(BaseObserver):
|
||||
|
||||
def _create_proxies(self, observers) -> List[Proxy]:
|
||||
proxies = []
|
||||
loop = asyncio.get_running_loop()
|
||||
for observer in observers:
|
||||
queue = asyncio.Queue()
|
||||
task = asyncio.create_task(self._proxy_task_handler(queue, observer))
|
||||
task = create_task(
|
||||
loop,
|
||||
self._proxy_task_handler(queue, observer),
|
||||
f"{self}::{observer.__class__.__name__}",
|
||||
)
|
||||
proxy = Proxy(queue=queue, task=task, observer=observer)
|
||||
proxies.append(proxy)
|
||||
return proxies
|
||||
|
||||
async def _proxy_task_handler(self, queue: asyncio.Queue, observer: BaseObserver):
|
||||
while True:
|
||||
try:
|
||||
data = await queue.get()
|
||||
await observer.on_push_frame(
|
||||
data.src, data.dst, data.frame, data.direction, data.timestamp
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
data = await queue.get()
|
||||
await observer.on_push_frame(
|
||||
data.src, data.dst, data.frame, data.direction, data.timestamp
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
@@ -4,8 +4,6 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
|
||||
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame
|
||||
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
@@ -38,18 +36,14 @@ class GatedOpenAILLMContextAggregator(FrameProcessor):
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _start(self):
|
||||
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
|
||||
self._gate_task = self.create_task(self._gate_task_handler())
|
||||
|
||||
async def _stop(self):
|
||||
self._gate_task.cancel()
|
||||
await self._gate_task
|
||||
await self.cancel_task(self._gate_task)
|
||||
|
||||
async def _gate_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
await self._notifier.wait()
|
||||
if self._last_context_frame:
|
||||
await self.push_frame(self._last_context_frame)
|
||||
self._last_context_frame = None
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
await self._notifier.wait()
|
||||
if self._last_context_frame:
|
||||
await self.push_frame(self._last_context_frame)
|
||||
self._last_context_frame = None
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
from pipecat.audio.utils import interleave_stereo_audio, mix_audio, resample_audio
|
||||
from pipecat.frames.frames import (
|
||||
EndFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
@@ -86,6 +87,9 @@ class AudioBufferProcessor(FrameProcessor):
|
||||
if self._buffer_size > 0 and len(self._user_audio_buffer) > self._buffer_size:
|
||||
await self._call_on_audio_data_handler()
|
||||
|
||||
if isinstance(frame, EndFrame):
|
||||
await self._call_on_audio_data_handler()
|
||||
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
async def _call_on_audio_data_handler(self):
|
||||
|
||||
@@ -6,15 +6,15 @@
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
import sys
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Callable, Optional
|
||||
from typing import Awaitable, Callable, Coroutine, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.clocks.base_clock import BaseClock
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
ErrorFrame,
|
||||
Frame,
|
||||
StartFrame,
|
||||
@@ -24,6 +24,7 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.metrics.metrics import LLMTokenUsage, MetricsData
|
||||
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
|
||||
from pipecat.utils.asyncio import cancel_task, create_task, wait_for_task
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
|
||||
|
||||
@@ -41,8 +42,8 @@ class FrameProcessor:
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
self.id: int = obj_id()
|
||||
self.name = name or f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self._id: int = obj_id()
|
||||
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self._parent: "FrameProcessor" | None = None
|
||||
self._prev: "FrameProcessor" | None = None
|
||||
self._next: "FrameProcessor" | None = None
|
||||
@@ -83,6 +84,14 @@ class FrameProcessor:
|
||||
# the exception to this rule. This create this task.
|
||||
self.__create_push_task()
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def interruptions_allowed(self):
|
||||
return self._allow_interruptions
|
||||
@@ -141,6 +150,16 @@ class FrameProcessor:
|
||||
await self.stop_ttfb_metrics()
|
||||
await self.stop_processing_metrics()
|
||||
|
||||
def create_task(self, coroutine: Coroutine) -> asyncio.Task:
|
||||
name = f"{self}::{coroutine.cr_code.co_name}"
|
||||
return create_task(self.get_event_loop(), coroutine, name)
|
||||
|
||||
async def cancel_task(self, task: asyncio.Task, timeout: Optional[float] = None):
|
||||
await cancel_task(task, timeout)
|
||||
|
||||
async def wait_for_task(self, task: asyncio.Task, timeout: Optional[float] = None):
|
||||
await wait_for_task(task, timeout)
|
||||
|
||||
async def cleanup(self):
|
||||
await self.__cancel_input_task()
|
||||
await self.__cancel_push_task()
|
||||
@@ -188,7 +207,6 @@ class FrameProcessor:
|
||||
async def resume_processing_frames(self):
|
||||
logger.trace(f"{self}: resuming frame processing")
|
||||
self.__input_event.set()
|
||||
self.__should_block_frames = False
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
if isinstance(frame, StartFrame):
|
||||
@@ -283,61 +301,44 @@ class FrameProcessor:
|
||||
def __create_input_task(self):
|
||||
self.__should_block_frames = False
|
||||
self.__input_queue = asyncio.Queue()
|
||||
self.__input_frame_task = self.get_event_loop().create_task(
|
||||
self.__input_frame_task_handler()
|
||||
)
|
||||
self.__input_event = asyncio.Event()
|
||||
self.__input_frame_task = self.create_task(self.__input_frame_task_handler())
|
||||
|
||||
async def __cancel_input_task(self):
|
||||
self.__input_frame_task.cancel()
|
||||
await self.__input_frame_task
|
||||
await self.cancel_task(self.__input_frame_task)
|
||||
|
||||
async def __input_frame_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
if self.__should_block_frames:
|
||||
logger.trace(f"{self}: frame processing paused")
|
||||
await self.__input_event.wait()
|
||||
self.__input_event.clear()
|
||||
logger.trace(f"{self}: frame processing resumed")
|
||||
if self.__should_block_frames:
|
||||
logger.trace(f"{self}: frame processing paused")
|
||||
await self.__input_event.wait()
|
||||
self.__input_event.clear()
|
||||
self.__should_block_frames = False
|
||||
logger.trace(f"{self}: frame processing resumed")
|
||||
|
||||
(frame, direction, callback) = await self.__input_queue.get()
|
||||
(frame, direction, callback) = await self.__input_queue.get()
|
||||
|
||||
# Process the frame.
|
||||
await self.process_frame(frame, direction)
|
||||
# Process the frame.
|
||||
await self.process_frame(frame, direction)
|
||||
|
||||
# If this frame has an associated callback, call it now.
|
||||
if callback:
|
||||
await callback(self, frame, direction)
|
||||
# If this frame has an associated callback, call it now.
|
||||
if callback:
|
||||
await callback(self, frame, direction)
|
||||
|
||||
self.__input_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
logger.trace(f"{self}: cancelled input task")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self}: Uncaught exception {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
self.__input_queue.task_done()
|
||||
|
||||
def __create_push_task(self):
|
||||
self.__push_queue = asyncio.Queue()
|
||||
self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler())
|
||||
self.__push_frame_task = self.create_task(self.__push_frame_task_handler())
|
||||
|
||||
async def __cancel_push_task(self):
|
||||
self.__push_frame_task.cancel()
|
||||
await self.__push_frame_task
|
||||
await self.cancel_task(self.__push_frame_task)
|
||||
|
||||
async def __push_frame_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
(frame, direction) = await self.__push_queue.get()
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
self.__push_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
logger.trace(f"{self}: cancelled push task")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self}: Uncaught exception {e}")
|
||||
await self.push_error(ErrorFrame(str(e)))
|
||||
(frame, direction) = await self.__push_queue.get()
|
||||
await self.__internal_push_frame(frame, direction)
|
||||
self.__push_queue.task_done()
|
||||
|
||||
async def _call_event_handler(self, event_name: str, *args, **kwargs):
|
||||
try:
|
||||
|
||||
@@ -58,6 +58,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
|
||||
OpenAILLMContextFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame
|
||||
from pipecat.utils.string import match_endofsentence
|
||||
|
||||
RTVI_PROTOCOL_VERSION = "0.3.0"
|
||||
@@ -295,6 +296,12 @@ class RTVITextMessageData(BaseModel):
|
||||
text: str
|
||||
|
||||
|
||||
class RTVISearchResponseMessageData(BaseModel):
|
||||
search_result: Optional[str]
|
||||
rendered_content: Optional[str]
|
||||
origins: List[LLMSearchOrigin]
|
||||
|
||||
|
||||
class RTVIBotTranscriptionMessage(BaseModel):
|
||||
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
|
||||
type: Literal["bot-transcription"] = "bot-transcription"
|
||||
@@ -307,6 +314,12 @@ class RTVIBotLLMTextMessage(BaseModel):
|
||||
data: RTVITextMessageData
|
||||
|
||||
|
||||
class RTVIBotLLMSearchResponseMessage(BaseModel):
|
||||
label: Literal["rtvi-ai"] = "rtvi-ai"
|
||||
type: Literal["bot-llm-search-response"] = "bot-llm-search-response"
|
||||
data: RTVISearchResponseMessageData
|
||||
|
||||
|
||||
class RTVIBotTTSTextMessage(BaseModel):
|
||||
label: RTVIMessageLiteral = RTVI_MESSAGE_LABEL
|
||||
type: Literal["bot-tts-text"] = "bot-tts-text"
|
||||
@@ -610,6 +623,8 @@ class RTVIObserver(BaseObserver):
|
||||
await self._push_transport_message_urgent(RTVIBotLLMStoppedMessage())
|
||||
elif isinstance(frame, LLMTextFrame):
|
||||
await self._handle_llm_text_frame(frame)
|
||||
elif isinstance(frame, LLMSearchResponseFrame):
|
||||
await self._handle_llm_search_response_frame(frame)
|
||||
elif isinstance(frame, TTSStartedFrame):
|
||||
await self._push_transport_message_urgent(RTVIBotTTSStartedMessage())
|
||||
elif isinstance(frame, TTSStoppedFrame):
|
||||
@@ -660,6 +675,16 @@ class RTVIObserver(BaseObserver):
|
||||
if match_endofsentence(self._bot_transcription):
|
||||
await self._push_bot_transcription()
|
||||
|
||||
async def _handle_llm_search_response_frame(self, frame: LLMSearchResponseFrame):
|
||||
message = RTVIBotLLMSearchResponseMessage(
|
||||
data=RTVISearchResponseMessageData(
|
||||
search_result=frame.search_result,
|
||||
origins=frame.origins,
|
||||
rendered_content=frame.rendered_content,
|
||||
)
|
||||
)
|
||||
await self._push_transport_message_urgent(message)
|
||||
|
||||
async def _handle_user_transcriptions(self, frame: Frame):
|
||||
message = None
|
||||
if isinstance(frame, TranscriptionFrame):
|
||||
@@ -679,17 +704,20 @@ class RTVIObserver(BaseObserver):
|
||||
await self._push_transport_message_urgent(message)
|
||||
|
||||
async def _handle_context(self, frame: OpenAILLMContextFrame):
|
||||
messages = frame.context.messages
|
||||
if len(messages) > 0:
|
||||
message = messages[-1]
|
||||
if message["role"] == "user":
|
||||
content = message["content"]
|
||||
if isinstance(content, list):
|
||||
text = " ".join(item["text"] for item in content if "text" in item)
|
||||
else:
|
||||
text = content
|
||||
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
|
||||
await self._push_transport_message_urgent(rtvi_message)
|
||||
try:
|
||||
messages = frame.context.messages
|
||||
if len(messages) > 0:
|
||||
message = messages[-1]
|
||||
if message["role"] == "user":
|
||||
content = message["content"]
|
||||
if isinstance(content, list):
|
||||
text = " ".join(item["text"] for item in content if "text" in item)
|
||||
else:
|
||||
text = content
|
||||
rtvi_message = RTVIUserLLMTextMessage(data=RTVITextMessageData(text=text))
|
||||
await self._push_transport_message_urgent(rtvi_message)
|
||||
except TypeError as e:
|
||||
logger.warning(f"Caught an error while trying to handle context: {e}")
|
||||
|
||||
async def _handle_metrics(self, frame: MetricsFrame):
|
||||
metrics = {}
|
||||
@@ -736,11 +764,11 @@ class RTVIProcessor(FrameProcessor):
|
||||
|
||||
# A task to process incoming action frames.
|
||||
self._action_queue = asyncio.Queue()
|
||||
self._action_task = self.get_event_loop().create_task(self._action_task_handler())
|
||||
self._action_task = self.create_task(self._action_task_handler())
|
||||
|
||||
# A task to process incoming transport messages.
|
||||
self._message_queue = asyncio.Queue()
|
||||
self._message_task = self.get_event_loop().create_task(self._message_task_handler())
|
||||
self._message_task = self.create_task(self._message_task_handler())
|
||||
|
||||
self._register_event_handler("on_bot_started")
|
||||
self._register_event_handler("on_client_ready")
|
||||
@@ -845,13 +873,11 @@ class RTVIProcessor(FrameProcessor):
|
||||
|
||||
async def _cancel_tasks(self):
|
||||
if self._action_task:
|
||||
self._action_task.cancel()
|
||||
await self._action_task
|
||||
await self.cancel_task(self._action_task)
|
||||
self._action_task = None
|
||||
|
||||
if self._message_task:
|
||||
self._message_task.cancel()
|
||||
await self._message_task
|
||||
await self.cancel_task(self._message_task)
|
||||
self._message_task = None
|
||||
|
||||
async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True):
|
||||
@@ -860,21 +886,15 @@ class RTVIProcessor(FrameProcessor):
|
||||
|
||||
async def _action_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
frame = await self._action_queue.get()
|
||||
await self._handle_action(frame.message_id, frame.rtvi_action_run)
|
||||
self._action_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
frame = await self._action_queue.get()
|
||||
await self._handle_action(frame.message_id, frame.rtvi_action_run)
|
||||
self._action_queue.task_done()
|
||||
|
||||
async def _message_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
message = await self._message_queue.get()
|
||||
await self._handle_message(message)
|
||||
self._message_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
message = await self._message_queue.get()
|
||||
await self._handle_message(message)
|
||||
self._message_queue.task_done()
|
||||
|
||||
async def _handle_transport_message(self, frame: TransportMessageUrgentFrame):
|
||||
try:
|
||||
|
||||
@@ -49,12 +49,11 @@ class IdleFrameProcessor(FrameProcessor):
|
||||
self._idle_event.set()
|
||||
|
||||
async def cleanup(self):
|
||||
self._idle_task.cancel()
|
||||
await self._idle_task
|
||||
await self.cancel_task(self._idle_task)
|
||||
|
||||
def _create_idle_task(self):
|
||||
self._idle_event = asyncio.Event()
|
||||
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
|
||||
self._idle_task = self.create_task(self._idle_task_handler())
|
||||
|
||||
async def _idle_task_handler(self):
|
||||
while True:
|
||||
@@ -62,7 +61,5 @@ class IdleFrameProcessor(FrameProcessor):
|
||||
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
|
||||
except asyncio.TimeoutError:
|
||||
await self._callback(self)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
finally:
|
||||
self._idle_event.clear()
|
||||
|
||||
@@ -5,7 +5,8 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import Awaitable, Callable
|
||||
import inspect
|
||||
from typing import Awaitable, Callable, Union
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotSpeakingFrame,
|
||||
@@ -25,11 +26,23 @@ class UserIdleProcessor(FrameProcessor):
|
||||
or BotSpeaking).
|
||||
|
||||
Args:
|
||||
callback: Function to call when user is idle
|
||||
callback: Function to call when user is idle. Can be either:
|
||||
- Basic callback(processor) -> None
|
||||
- Retry callback(processor, retry_count) -> bool
|
||||
Return True to continue monitoring for idle events,
|
||||
Return False to stop the idle monitoring task
|
||||
timeout: Seconds to wait before considering user idle
|
||||
**kwargs: Additional arguments passed to FrameProcessor
|
||||
|
||||
Example:
|
||||
# Retry callback:
|
||||
async def handle_idle(processor: "UserIdleProcessor", retry_count: int) -> bool:
|
||||
if retry_count < 3:
|
||||
await send_reminder("Are you still there?")
|
||||
return True
|
||||
return False
|
||||
|
||||
# Basic callback:
|
||||
async def handle_idle(processor: "UserIdleProcessor") -> None:
|
||||
await send_reminder("Are you still there?")
|
||||
|
||||
@@ -42,34 +55,68 @@ class UserIdleProcessor(FrameProcessor):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
callback: Callable[["UserIdleProcessor"], Awaitable[None]],
|
||||
callback: Union[
|
||||
Callable[["UserIdleProcessor"], Awaitable[None]], # Basic
|
||||
Callable[["UserIdleProcessor", int], Awaitable[bool]], # Retry
|
||||
],
|
||||
timeout: float,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self._callback = callback
|
||||
self._callback = self._wrap_callback(callback)
|
||||
self._timeout = timeout
|
||||
self._retry_count = 0
|
||||
self._interrupted = False
|
||||
self._conversation_started = False
|
||||
self._idle_task = None
|
||||
self._idle_event = asyncio.Event()
|
||||
|
||||
def _create_idle_task(self):
|
||||
"""Create the idle task if it hasn't been created yet."""
|
||||
if self._idle_task is None:
|
||||
self._idle_task = self.get_event_loop().create_task(self._idle_task_handler())
|
||||
def _wrap_callback(
|
||||
self,
|
||||
callback: Union[
|
||||
Callable[["UserIdleProcessor"], Awaitable[None]],
|
||||
Callable[["UserIdleProcessor", int], Awaitable[bool]],
|
||||
],
|
||||
) -> Callable[["UserIdleProcessor", int], Awaitable[bool]]:
|
||||
"""Wraps callback to support both basic and retry signatures.
|
||||
|
||||
async def _stop(self):
|
||||
Args:
|
||||
callback: The callback function to wrap.
|
||||
|
||||
Returns:
|
||||
A wrapped callback that returns bool to indicate whether to continue monitoring.
|
||||
"""
|
||||
sig = inspect.signature(callback)
|
||||
param_count = len(sig.parameters)
|
||||
|
||||
async def wrapper(processor: "UserIdleProcessor", retry_count: int) -> bool:
|
||||
if param_count == 1:
|
||||
# Basic callback
|
||||
await callback(processor) # type: ignore
|
||||
return True
|
||||
else:
|
||||
# Retry callback
|
||||
return await callback(processor, retry_count) # type: ignore
|
||||
|
||||
return wrapper
|
||||
|
||||
def _create_idle_task(self) -> None:
|
||||
"""Creates the idle task if it hasn't been created yet."""
|
||||
if self._idle_task is None:
|
||||
self._idle_task = self.create_task(self._idle_task_handler())
|
||||
|
||||
@property
|
||||
def retry_count(self) -> int:
|
||||
"""Returns the current retry count."""
|
||||
return self._retry_count
|
||||
|
||||
async def _stop(self) -> None:
|
||||
"""Stops and cleans up the idle monitoring task."""
|
||||
if self._idle_task is not None:
|
||||
self._idle_task.cancel()
|
||||
try:
|
||||
await self._idle_task
|
||||
except asyncio.CancelledError:
|
||||
pass # Expected when task is cancelled
|
||||
await self.cancel_task(self._idle_task)
|
||||
self._idle_task = None
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
|
||||
"""Processes incoming frames and manages idle monitoring state.
|
||||
|
||||
Args:
|
||||
@@ -98,6 +145,7 @@ class UserIdleProcessor(FrameProcessor):
|
||||
if self._conversation_started:
|
||||
# We shouldn't call the idle callback if the user or the bot are speaking
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
self._retry_count = 0 # Reset retry count when user speaks
|
||||
self._interrupted = True
|
||||
self._idle_event.set()
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
@@ -106,23 +154,26 @@ class UserIdleProcessor(FrameProcessor):
|
||||
elif isinstance(frame, BotSpeakingFrame):
|
||||
self._idle_event.set()
|
||||
|
||||
async def cleanup(self):
|
||||
async def cleanup(self) -> None:
|
||||
"""Cleans up resources when processor is shutting down."""
|
||||
await super().cleanup()
|
||||
if self._idle_task: # Only stop if task exists
|
||||
await self._stop()
|
||||
|
||||
async def _idle_task_handler(self):
|
||||
async def _idle_task_handler(self) -> None:
|
||||
"""Monitors for idle timeout and triggers callbacks.
|
||||
|
||||
Runs in a loop until cancelled.
|
||||
Runs in a loop until cancelled or callback indicates completion.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
|
||||
except asyncio.TimeoutError:
|
||||
if not self._interrupted:
|
||||
await self._callback(self)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
self._retry_count += 1
|
||||
should_continue = await self._callback(self, self._retry_count)
|
||||
if not should_continue:
|
||||
await self._stop()
|
||||
break
|
||||
finally:
|
||||
self._idle_event.clear()
|
||||
|
||||
@@ -8,7 +8,7 @@ import asyncio
|
||||
import io
|
||||
import wave
|
||||
from abc import abstractmethod
|
||||
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
|
||||
from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional, Tuple
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -69,7 +69,7 @@ class AIService(FrameProcessor):
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
pass
|
||||
|
||||
async def _update_settings(self, settings: Dict[str, Any]):
|
||||
async def _update_settings(self, settings: Mapping[str, Any]):
|
||||
from pipecat.services.openai_realtime_beta.events import (
|
||||
SessionProperties,
|
||||
)
|
||||
@@ -253,23 +253,21 @@ class TTSService(AIService):
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
if self._push_stop_frames:
|
||||
self._stop_frame_task = self.get_event_loop().create_task(self._stop_frame_handler())
|
||||
self._stop_frame_task = self.create_task(self._stop_frame_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
if self._stop_frame_task:
|
||||
self._stop_frame_task.cancel()
|
||||
await self._stop_frame_task
|
||||
await self.cancel_task(self._stop_frame_task)
|
||||
self._stop_frame_task = None
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
if self._stop_frame_task:
|
||||
self._stop_frame_task.cancel()
|
||||
await self._stop_frame_task
|
||||
await self.cancel_task(self._stop_frame_task)
|
||||
self._stop_frame_task = None
|
||||
|
||||
async def _update_settings(self, settings: Dict[str, Any]):
|
||||
async def _update_settings(self, settings: Mapping[str, Any]):
|
||||
for key, value in settings.items():
|
||||
if key in self._settings:
|
||||
logger.info(f"Updating TTS setting {key} to: [{value}]")
|
||||
@@ -364,23 +362,20 @@ class TTSService(AIService):
|
||||
await self.push_frame(TTSTextFrame(text))
|
||||
|
||||
async def _stop_frame_handler(self):
|
||||
try:
|
||||
has_started = False
|
||||
while True:
|
||||
try:
|
||||
frame = await asyncio.wait_for(
|
||||
self._stop_frame_queue.get(), self._stop_frame_timeout_s
|
||||
)
|
||||
if isinstance(frame, TTSStartedFrame):
|
||||
has_started = True
|
||||
elif isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
|
||||
has_started = False
|
||||
except asyncio.TimeoutError:
|
||||
if has_started:
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
has_started = False
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
has_started = False
|
||||
while True:
|
||||
try:
|
||||
frame = await asyncio.wait_for(
|
||||
self._stop_frame_queue.get(), self._stop_frame_timeout_s
|
||||
)
|
||||
if isinstance(frame, TTSStartedFrame):
|
||||
has_started = True
|
||||
elif isinstance(frame, (TTSStoppedFrame, StartInterruptionFrame)):
|
||||
has_started = False
|
||||
except asyncio.TimeoutError:
|
||||
if has_started:
|
||||
await self.push_frame(TTSStoppedFrame())
|
||||
has_started = False
|
||||
|
||||
|
||||
class WordTTSService(TTSService):
|
||||
@@ -388,7 +383,7 @@ class WordTTSService(TTSService):
|
||||
super().__init__(**kwargs)
|
||||
self._initial_word_timestamp = -1
|
||||
self._words_queue = asyncio.Queue()
|
||||
self._words_task = self.get_event_loop().create_task(self._words_task_handler())
|
||||
self._words_task = self.create_task(self._words_task_handler())
|
||||
|
||||
def start_word_timestamps(self):
|
||||
if self._initial_word_timestamp == -1:
|
||||
@@ -421,35 +416,29 @@ class WordTTSService(TTSService):
|
||||
|
||||
async def _stop_words_task(self):
|
||||
if self._words_task:
|
||||
self._words_task.cancel()
|
||||
await self._words_task
|
||||
await self.cancel_task(self._words_task)
|
||||
self._words_task = None
|
||||
|
||||
async def _words_task_handler(self):
|
||||
last_pts = 0
|
||||
while True:
|
||||
try:
|
||||
(word, timestamp) = await self._words_queue.get()
|
||||
if word == "Reset" and timestamp == 0:
|
||||
self.reset_word_timestamps()
|
||||
frame = None
|
||||
elif word == "LLMFullResponseEndFrame" and timestamp == 0:
|
||||
frame = LLMFullResponseEndFrame()
|
||||
frame.pts = last_pts
|
||||
elif word == "TTSStoppedFrame" and timestamp == 0:
|
||||
frame = TTSStoppedFrame()
|
||||
frame.pts = last_pts
|
||||
else:
|
||||
frame = TTSTextFrame(word)
|
||||
frame.pts = self._initial_word_timestamp + timestamp
|
||||
if frame:
|
||||
last_pts = frame.pts
|
||||
await self.push_frame(frame)
|
||||
self._words_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
(word, timestamp) = await self._words_queue.get()
|
||||
if word == "Reset" and timestamp == 0:
|
||||
self.reset_word_timestamps()
|
||||
frame = None
|
||||
elif word == "LLMFullResponseEndFrame" and timestamp == 0:
|
||||
frame = LLMFullResponseEndFrame()
|
||||
frame.pts = last_pts
|
||||
elif word == "TTSStoppedFrame" and timestamp == 0:
|
||||
frame = TTSStoppedFrame()
|
||||
frame.pts = last_pts
|
||||
else:
|
||||
frame = TTSTextFrame(word)
|
||||
frame.pts = self._initial_word_timestamp + timestamp
|
||||
if frame:
|
||||
last_pts = frame.pts
|
||||
await self.push_frame(frame)
|
||||
self._words_queue.task_done()
|
||||
|
||||
|
||||
class STTService(AIService):
|
||||
@@ -479,7 +468,7 @@ class STTService(AIService):
|
||||
"""Returns transcript as a string"""
|
||||
pass
|
||||
|
||||
async def _update_settings(self, settings: Dict[str, Any]):
|
||||
async def _update_settings(self, settings: Mapping[str, Any]):
|
||||
logger.info(f"Updating STT settings: {self._settings}")
|
||||
for key, value in settings.items():
|
||||
if key in self._settings:
|
||||
|
||||
@@ -88,7 +88,7 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
voice_id: str,
|
||||
cartesia_version: str = "2024-06-10",
|
||||
url: str = "wss://api.cartesia.ai/tts/websocket",
|
||||
model: str = "sonic-english",
|
||||
model: str = "sonic",
|
||||
sample_rate: int = 24000,
|
||||
encoding: str = "pcm_s16le",
|
||||
container: str = "raw",
|
||||
@@ -187,16 +187,13 @@ class CartesiaTTSService(WordTTSService, WebsocketService):
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
|
||||
self._receive_task = self.get_event_loop().create_task(
|
||||
self._receive_task_handler(self.push_error)
|
||||
)
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
await self._disconnect_websocket()
|
||||
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
async def _connect_websocket(self):
|
||||
@@ -329,7 +326,7 @@ class CartesiaHttpTTSService(TTSService):
|
||||
*,
|
||||
api_key: str,
|
||||
voice_id: str,
|
||||
model: str = "sonic-english",
|
||||
model: str = "sonic",
|
||||
base_url: str = "https://api.cartesia.ai",
|
||||
sample_rate: int = 24000,
|
||||
encoding: str = "pcm_s16le",
|
||||
|
||||
@@ -44,10 +44,11 @@ except ModuleNotFoundError as e:
|
||||
|
||||
ElevenLabsOutputFormat = Literal["pcm_16000", "pcm_22050", "pcm_24000", "pcm_44100"]
|
||||
|
||||
# Models that support language codes
|
||||
# eleven_multilingual_v2 doesn't support language codes, so it's excluded
|
||||
ELEVENLABS_MULTILINGUAL_MODELS = {
|
||||
"eleven_turbo_v2_5",
|
||||
"eleven_multilingual_v2",
|
||||
"eleven_flash_v2_5",
|
||||
"eleven_turbo_v2_5",
|
||||
}
|
||||
|
||||
|
||||
@@ -298,20 +299,16 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
|
||||
self._receive_task = self.get_event_loop().create_task(
|
||||
self._receive_task_handler(self.push_error)
|
||||
)
|
||||
self._keepalive_task = self.get_event_loop().create_task(self._keepalive_task_handler())
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
|
||||
self._keepalive_task = self.create_task(self._keepalive_task_handler())
|
||||
|
||||
async def _disconnect(self):
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
if self._keepalive_task:
|
||||
self._keepalive_task.cancel()
|
||||
await self._keepalive_task
|
||||
await self.cancel_task(self._keepalive_task)
|
||||
self._keepalive_task = None
|
||||
|
||||
await self._disconnect_websocket()
|
||||
@@ -382,13 +379,8 @@ class ElevenLabsTTSService(WordTTSService, WebsocketService):
|
||||
|
||||
async def _keepalive_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(10)
|
||||
await self._send_text("")
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
await asyncio.sleep(10)
|
||||
await self._send_text("")
|
||||
|
||||
async def _send_text(self, text: str):
|
||||
if self._websocket:
|
||||
|
||||
@@ -104,15 +104,12 @@ class FishAudioTTSService(TTSService, WebsocketService):
|
||||
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
self._receive_task = self.get_event_loop().create_task(
|
||||
self._receive_task_handler(self.push_error)
|
||||
)
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
await self._disconnect_websocket()
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
async def _connect_websocket(self):
|
||||
|
||||
@@ -182,9 +182,13 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
|
||||
self._audio_input_paused = start_audio_paused
|
||||
self._video_input_paused = start_video_paused
|
||||
self._context = None
|
||||
self._websocket = None
|
||||
self._receive_task = None
|
||||
self._context = None
|
||||
self._transcribe_audio_task = None
|
||||
self._transcribe_model_audio_task = None
|
||||
self._transcribe_audio_queue = asyncio.Queue()
|
||||
self._transcribe_model_audio_queue = asyncio.Queue()
|
||||
|
||||
self._disconnecting = False
|
||||
self._api_session_ready = False
|
||||
@@ -244,6 +248,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
await self._connect()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
@@ -275,7 +280,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
)
|
||||
await self.send_client_event(evt)
|
||||
if self._transcribe_user_audio and self._context:
|
||||
asyncio.create_task(self._handle_transcribe_user_audio(audio, self._context))
|
||||
await self._transcribe_audio_queue.put(audio)
|
||||
|
||||
async def _handle_transcribe_user_audio(self, audio, context):
|
||||
text = await self._transcribe_audio(audio, context)
|
||||
@@ -381,17 +386,21 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
await self._ws_send(event.model_dump(exclude_none=True))
|
||||
|
||||
async def _connect(self):
|
||||
if self._websocket:
|
||||
# Here we assume that if we have a websocket, we are connected. We
|
||||
# handle disconnections in the send/recv code paths.
|
||||
return
|
||||
|
||||
logger.info("Connecting to Gemini service")
|
||||
try:
|
||||
if self._websocket:
|
||||
# Here we assume that if we have a websocket, we are connected. We
|
||||
# handle disconnections in the send/recv code paths.
|
||||
return
|
||||
|
||||
uri = f"wss://{self.base_url}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={self.api_key}"
|
||||
logger.info(f"Connecting to {uri}")
|
||||
self._websocket = await websockets.connect(uri=uri)
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
self._transcribe_audio_task = self.create_task(self._transcribe_audio_handler())
|
||||
self._transcribe_model_audio_task = self.create_task(
|
||||
self._transcribe_model_audio_handler()
|
||||
)
|
||||
config = events.Config.model_validate(
|
||||
{
|
||||
"setup": {
|
||||
@@ -441,12 +450,14 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
try:
|
||||
await asyncio.wait_for(self._receive_task, timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Timed out waiting for receive task to finish")
|
||||
await self.cancel_task(self._receive_task, timeout=1.0)
|
||||
self._receive_task = None
|
||||
if self._transcribe_audio_task:
|
||||
await self.cancel_task(self._transcribe_audio_task)
|
||||
self._transcribe_audio_task = None
|
||||
if self._transcribe_model_audio_task:
|
||||
await self.cancel_task(self._transcribe_model_audio_task)
|
||||
self._transcribe_model_audio_task = None
|
||||
self._disconnecting = False
|
||||
except Exception as e:
|
||||
logger.error(f"{self} error disconnecting: {e}")
|
||||
@@ -454,9 +465,8 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
async def _ws_send(self, message):
|
||||
# logger.debug(f"Sending message to websocket: {message}")
|
||||
try:
|
||||
if not self._websocket:
|
||||
await self._connect()
|
||||
await self._websocket.send(json.dumps(message))
|
||||
if self._websocket:
|
||||
await self._websocket.send(json.dumps(message))
|
||||
except Exception as e:
|
||||
if self._disconnecting:
|
||||
return
|
||||
@@ -473,32 +483,35 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
#
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
try:
|
||||
async for message in self._websocket:
|
||||
evt = events.parse_server_event(message)
|
||||
# logger.debug(f"Received event: {message[:500]}")
|
||||
# logger.debug(f"Received event: {evt}")
|
||||
async for message in self._websocket:
|
||||
evt = events.parse_server_event(message)
|
||||
# logger.debug(f"Received event: {message[:500]}")
|
||||
# logger.debug(f"Received event: {evt}")
|
||||
|
||||
if evt.setupComplete:
|
||||
await self._handle_evt_setup_complete(evt)
|
||||
elif evt.serverContent and evt.serverContent.modelTurn:
|
||||
await self._handle_evt_model_turn(evt)
|
||||
elif evt.serverContent and evt.serverContent.turnComplete:
|
||||
await self._handle_evt_turn_complete(evt)
|
||||
elif evt.toolCall:
|
||||
await self._handle_evt_tool_call(evt)
|
||||
if evt.setupComplete:
|
||||
await self._handle_evt_setup_complete(evt)
|
||||
elif evt.serverContent and evt.serverContent.modelTurn:
|
||||
await self._handle_evt_model_turn(evt)
|
||||
elif evt.serverContent and evt.serverContent.turnComplete:
|
||||
await self._handle_evt_turn_complete(evt)
|
||||
elif evt.toolCall:
|
||||
await self._handle_evt_tool_call(evt)
|
||||
elif False: # !!! todo: error events?
|
||||
await self._handle_evt_error(evt)
|
||||
# errors are fatal, so exit the receive loop
|
||||
return
|
||||
else:
|
||||
pass
|
||||
|
||||
elif False: # !!! todo: error events?
|
||||
await self._handle_evt_error(evt)
|
||||
# errors are fatal, so exit the receive loop
|
||||
return
|
||||
async def _transcribe_audio_handler(self):
|
||||
while True:
|
||||
audio = await self._transcribe_audio_queue.get()
|
||||
await self._handle_transcribe_user_audio(audio, self._context)
|
||||
|
||||
else:
|
||||
pass
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("websocket receive task cancelled")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
async def _transcribe_model_audio_handler(self):
|
||||
while True:
|
||||
audio = await self._transcribe_model_audio_queue.get()
|
||||
await self._handle_transcribe_model_audio(audio, self._context)
|
||||
|
||||
#
|
||||
#
|
||||
@@ -679,7 +692,7 @@ class GeminiMultimodalLiveLLMService(LLMService):
|
||||
self._bot_text_buffer = ""
|
||||
|
||||
if audio and self._transcribe_model_audio and self._context:
|
||||
asyncio.create_task(self._handle_transcribe_model_audio(audio, self._context))
|
||||
await self._transcribe_model_audio_queue.put(audio)
|
||||
elif text:
|
||||
await self.push_frame(LLMFullResponseEndFrame())
|
||||
|
||||
|
||||
@@ -180,7 +180,7 @@ class GladiaSTTService(STTService):
|
||||
await super().start(frame)
|
||||
response = await self._setup_gladia()
|
||||
self._websocket = await websockets.connect(response["url"])
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
|
||||
2
src/pipecat/services/google/__init__.py
Normal file
2
src/pipecat/services/google/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
from .frames import LLMSearchResponseFrame
|
||||
from .google import *
|
||||
33
src/pipecat/services/google/frames.py
Normal file
33
src/pipecat/services/google/frames.py
Normal file
@@ -0,0 +1,33 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
|
||||
from pipecat.frames.frames import DataFrame
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMSearchResult:
|
||||
text: str
|
||||
confidence: List[float] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMSearchOrigin:
|
||||
site_uri: Optional[str] = None
|
||||
site_title: Optional[str] = None
|
||||
results: List[LLMSearchResult] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMSearchResponseFrame(DataFrame):
|
||||
search_result: Optional[str] = None
|
||||
rendered_content: Optional[str] = None
|
||||
origins: List[LLMSearchOrigin] = field(default_factory=list)
|
||||
|
||||
def __str__(self):
|
||||
return f"LLMSearchResponseFrame(search_result={self.search_result}, origins={self.origins})"
|
||||
@@ -38,6 +38,7 @@ from pipecat.processors.aggregators.openai_llm_context import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.services.ai_services import LLMService, TTSService
|
||||
from pipecat.services.google.frames import LLMSearchResponseFrame
|
||||
from pipecat.services.openai import (
|
||||
OpenAIAssistantContextAggregator,
|
||||
OpenAIUserContextAggregator,
|
||||
@@ -639,6 +640,9 @@ class GoogleLLMService(LLMService):
|
||||
completion_tokens = 0
|
||||
total_tokens = 0
|
||||
|
||||
grounding_metadata = None
|
||||
search_result = ""
|
||||
|
||||
try:
|
||||
logger.debug(
|
||||
# f"Generating chat: {self._system_instruction} | {context.get_messages_for_logging()}"
|
||||
@@ -698,6 +702,7 @@ class GoogleLLMService(LLMService):
|
||||
try:
|
||||
for c in chunk.parts:
|
||||
if c.text:
|
||||
search_result += c.text
|
||||
await self.push_frame(LLMTextFrame(c.text))
|
||||
elif c.function_call:
|
||||
logger.debug(f"!!! Function call: {c.function_call}")
|
||||
@@ -708,6 +713,63 @@ class GoogleLLMService(LLMService):
|
||||
function_name=c.function_call.name,
|
||||
arguments=args,
|
||||
)
|
||||
# Handle grounding metadata
|
||||
# It seems only the last chunk that we receive may contain this information
|
||||
# If the response doesn't include groundingMetadata, this means the response wasn't grounded.
|
||||
if chunk.candidates:
|
||||
for candidate in chunk.candidates:
|
||||
# logger.debug(f"candidate received: {candidate}")
|
||||
# Extract grounding metadata
|
||||
grounding_metadata = (
|
||||
{
|
||||
"rendered_content": getattr(
|
||||
getattr(candidate, "grounding_metadata", None),
|
||||
"search_entry_point",
|
||||
None,
|
||||
).rendered_content
|
||||
if hasattr(
|
||||
getattr(candidate, "grounding_metadata", None),
|
||||
"search_entry_point",
|
||||
)
|
||||
else None,
|
||||
"origins": [
|
||||
{
|
||||
"site_uri": getattr(grounding_chunk.web, "uri", None),
|
||||
"site_title": getattr(
|
||||
grounding_chunk.web, "title", None
|
||||
),
|
||||
"results": [
|
||||
{
|
||||
"text": getattr(
|
||||
grounding_support.segment, "text", ""
|
||||
),
|
||||
"confidence": getattr(
|
||||
grounding_support, "confidence_scores", None
|
||||
),
|
||||
}
|
||||
for grounding_support in getattr(
|
||||
getattr(candidate, "grounding_metadata", None),
|
||||
"grounding_supports",
|
||||
[],
|
||||
)
|
||||
if index
|
||||
in getattr(
|
||||
grounding_support, "grounding_chunk_indices", []
|
||||
)
|
||||
],
|
||||
}
|
||||
for index, grounding_chunk in enumerate(
|
||||
getattr(
|
||||
getattr(candidate, "grounding_metadata", None),
|
||||
"grounding_chunks",
|
||||
[],
|
||||
)
|
||||
)
|
||||
],
|
||||
}
|
||||
if getattr(candidate, "grounding_metadata", None)
|
||||
else None
|
||||
)
|
||||
except Exception as e:
|
||||
# Google LLMs seem to flag safety issues a lot!
|
||||
if chunk.candidates[0].finish_reason == 3:
|
||||
@@ -720,6 +782,14 @@ class GoogleLLMService(LLMService):
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
finally:
|
||||
if grounding_metadata is not None and isinstance(grounding_metadata, dict):
|
||||
llm_search_frame = LLMSearchResponseFrame(
|
||||
search_result=search_result,
|
||||
origins=grounding_metadata["origins"],
|
||||
rendered_content=grounding_metadata["rendered_content"],
|
||||
)
|
||||
await self.push_frame(llm_search_frame)
|
||||
|
||||
await self.start_llm_usage_metrics(
|
||||
LLMTokenUsage(
|
||||
prompt_tokens=prompt_tokens,
|
||||
@@ -113,16 +113,13 @@ class LmntTTSService(TTSService, WebsocketService):
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
|
||||
self._receive_task = self.get_event_loop().create_task(
|
||||
self._receive_task_handler(self.push_error)
|
||||
)
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
await self._disconnect_websocket()
|
||||
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
async def _connect_websocket(self):
|
||||
|
||||
@@ -167,8 +167,11 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
either the wall clock time or the actual audio duration to prevent invalid truncation
|
||||
requests.
|
||||
"""
|
||||
if not self._current_audio_response:
|
||||
return
|
||||
|
||||
# if the bot is still speaking, truncate the last message
|
||||
if self._current_audio_response:
|
||||
try:
|
||||
current = self._current_audio_response
|
||||
self._current_audio_response = None
|
||||
|
||||
@@ -179,6 +182,11 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
elapsed_ms = int(time.time() * 1000 - current.start_time_ms)
|
||||
truncate_ms = min(elapsed_ms, audio_duration_ms)
|
||||
|
||||
logger.trace(
|
||||
f"Truncating audio: duration={audio_duration_ms}ms, "
|
||||
f"elapsed={elapsed_ms}ms, truncate={truncate_ms}ms"
|
||||
)
|
||||
|
||||
await self.send_client_event(
|
||||
events.ConversationItemTruncateEvent(
|
||||
item_id=current.item_id,
|
||||
@@ -186,6 +194,9 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
audio_end_ms=truncate_ms,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
# Log warning and don't re-raise - allow session to continue
|
||||
logger.warning(f"Audio truncation failed (non-fatal): {e}")
|
||||
|
||||
#
|
||||
# frame processing
|
||||
@@ -266,7 +277,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
"OpenAI-Beta": "realtime=v1",
|
||||
},
|
||||
)
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_task_handler())
|
||||
self._receive_task = self.create_task(self._receive_task_handler())
|
||||
except Exception as e:
|
||||
logger.error(f"{self} initialization error: {e}")
|
||||
self._websocket = None
|
||||
@@ -280,11 +291,7 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
await self._websocket.close()
|
||||
self._websocket = None
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
try:
|
||||
await asyncio.wait_for(self._receive_task, timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Timed out waiting for receive task to finish")
|
||||
await self.cancel_task(self._receive_task, timeout=1.0)
|
||||
self._receive_task = None
|
||||
self._disconnecting = False
|
||||
except Exception as e:
|
||||
@@ -321,40 +328,32 @@ class OpenAIRealtimeBetaLLMService(LLMService):
|
||||
#
|
||||
|
||||
async def _receive_task_handler(self):
|
||||
try:
|
||||
async for message in self._websocket:
|
||||
evt = events.parse_server_event(message)
|
||||
if evt.type == "session.created":
|
||||
await self._handle_evt_session_created(evt)
|
||||
elif evt.type == "session.updated":
|
||||
await self._handle_evt_session_updated(evt)
|
||||
elif evt.type == "response.audio.delta":
|
||||
await self._handle_evt_audio_delta(evt)
|
||||
elif evt.type == "response.audio.done":
|
||||
await self._handle_evt_audio_done(evt)
|
||||
elif evt.type == "conversation.item.created":
|
||||
await self._handle_evt_conversation_item_created(evt)
|
||||
elif evt.type == "conversation.item.input_audio_transcription.completed":
|
||||
await self.handle_evt_input_audio_transcription_completed(evt)
|
||||
elif evt.type == "response.done":
|
||||
await self._handle_evt_response_done(evt)
|
||||
elif evt.type == "input_audio_buffer.speech_started":
|
||||
await self._handle_evt_speech_started(evt)
|
||||
elif evt.type == "input_audio_buffer.speech_stopped":
|
||||
await self._handle_evt_speech_stopped(evt)
|
||||
elif evt.type == "response.audio_transcript.delta":
|
||||
await self._handle_evt_audio_transcript_delta(evt)
|
||||
elif evt.type == "error":
|
||||
await self._handle_evt_error(evt)
|
||||
# errors are fatal, so exit the receive loop
|
||||
return
|
||||
|
||||
else:
|
||||
pass
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("websocket receive task cancelled")
|
||||
except Exception as e:
|
||||
logger.error(f"{self} exception: {e}")
|
||||
async for message in self._websocket:
|
||||
evt = events.parse_server_event(message)
|
||||
if evt.type == "session.created":
|
||||
await self._handle_evt_session_created(evt)
|
||||
elif evt.type == "session.updated":
|
||||
await self._handle_evt_session_updated(evt)
|
||||
elif evt.type == "response.audio.delta":
|
||||
await self._handle_evt_audio_delta(evt)
|
||||
elif evt.type == "response.audio.done":
|
||||
await self._handle_evt_audio_done(evt)
|
||||
elif evt.type == "conversation.item.created":
|
||||
await self._handle_evt_conversation_item_created(evt)
|
||||
elif evt.type == "conversation.item.input_audio_transcription.completed":
|
||||
await self.handle_evt_input_audio_transcription_completed(evt)
|
||||
elif evt.type == "response.done":
|
||||
await self._handle_evt_response_done(evt)
|
||||
elif evt.type == "input_audio_buffer.speech_started":
|
||||
await self._handle_evt_speech_started(evt)
|
||||
elif evt.type == "input_audio_buffer.speech_stopped":
|
||||
await self._handle_evt_speech_stopped(evt)
|
||||
elif evt.type == "response.audio_transcript.delta":
|
||||
await self._handle_evt_audio_transcript_delta(evt)
|
||||
elif evt.type == "error":
|
||||
await self._handle_evt_error(evt)
|
||||
# errors are fatal, so exit the receive loop
|
||||
return
|
||||
|
||||
async def _handle_evt_session_created(self, evt):
|
||||
# session.created is received right after connecting. Send a message
|
||||
|
||||
@@ -165,16 +165,13 @@ class PlayHTTTSService(TTSService, WebsocketService):
|
||||
async def _connect(self):
|
||||
await self._connect_websocket()
|
||||
|
||||
self._receive_task = self.get_event_loop().create_task(
|
||||
self._receive_task_handler(self.push_error)
|
||||
)
|
||||
self._receive_task = self.create_task(self._receive_task_handler(self.push_error))
|
||||
|
||||
async def _disconnect(self):
|
||||
await self._disconnect_websocket()
|
||||
|
||||
if self._receive_task:
|
||||
self._receive_task.cancel()
|
||||
await self._receive_task
|
||||
await self.cancel_task(self._receive_task)
|
||||
self._receive_task = None
|
||||
|
||||
async def _connect_websocket(self):
|
||||
|
||||
@@ -202,8 +202,8 @@ class ParakeetSTTService(STTService):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._thread_task = self.get_event_loop().create_task(self._thread_task_handler())
|
||||
self._response_task = self.get_event_loop().create_task(self._response_task_handler())
|
||||
self._thread_task = self.create_task(self._thread_task_handler())
|
||||
self._response_task = self.create_task(self._response_task_handler())
|
||||
self._response_queue = asyncio.Queue()
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
@@ -215,10 +215,8 @@ class ParakeetSTTService(STTService):
|
||||
await self._stop_tasks()
|
||||
|
||||
async def _stop_tasks(self):
|
||||
self._thread_task.cancel()
|
||||
await self._thread_task
|
||||
self._response_task.cancel()
|
||||
await self._response_task
|
||||
await self.cancel_task(self._thread_task)
|
||||
await self.cancel_task(self._response_task)
|
||||
|
||||
def _response_handler(self):
|
||||
responses = self._asr_service.streaming_response_generator(
|
||||
@@ -238,7 +236,7 @@ class ParakeetSTTService(STTService):
|
||||
await asyncio.to_thread(self._response_handler)
|
||||
except asyncio.CancelledError:
|
||||
self._thread_running = False
|
||||
pass
|
||||
raise
|
||||
|
||||
async def _handle_response(self, response):
|
||||
for result in response.results:
|
||||
@@ -260,11 +258,8 @@ class ParakeetSTTService(STTService):
|
||||
|
||||
async def _response_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
response = await self._response_queue.get()
|
||||
await self._handle_response(response)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
response = await self._response_queue.get()
|
||||
await self._handle_response(response)
|
||||
|
||||
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
|
||||
await self._queue.put(audio)
|
||||
|
||||
@@ -49,45 +49,33 @@ class SimliVideoService(FrameProcessor):
|
||||
async def _start_connection(self):
|
||||
await self._simli_client.Initialize()
|
||||
# Create task to consume and process audio and video
|
||||
self._audio_task = asyncio.create_task(self._consume_and_process_audio())
|
||||
self._video_task = asyncio.create_task(self._consume_and_process_video())
|
||||
self._audio_task = self.create_task(self._consume_and_process_audio())
|
||||
self._video_task = self.create_task(self._consume_and_process_video())
|
||||
|
||||
async def _consume_and_process_audio(self):
|
||||
try:
|
||||
await self._pipecat_resampler_event.wait()
|
||||
async for audio_frame in self._simli_client.getAudioStreamIterator():
|
||||
resampled_frames = self._pipecat_resampler.resample(audio_frame)
|
||||
for resampled_frame in resampled_frames:
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=resampled_frame.to_ndarray().tobytes(),
|
||||
sample_rate=self._pipecat_resampler.rate,
|
||||
num_channels=1,
|
||||
),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
await self._pipecat_resampler_event.wait()
|
||||
async for audio_frame in self._simli_client.getAudioStreamIterator():
|
||||
resampled_frames = self._pipecat_resampler.resample(audio_frame)
|
||||
for resampled_frame in resampled_frames:
|
||||
await self.push_frame(
|
||||
TTSAudioRawFrame(
|
||||
audio=resampled_frame.to_ndarray().tobytes(),
|
||||
sample_rate=self._pipecat_resampler.rate,
|
||||
num_channels=1,
|
||||
),
|
||||
)
|
||||
|
||||
async def _consume_and_process_video(self):
|
||||
try:
|
||||
await self._pipecat_resampler_event.wait()
|
||||
async for video_frame in self._simli_client.getVideoStreamIterator(
|
||||
targetFormat="rgb24"
|
||||
):
|
||||
# Process the video frame
|
||||
convertedFrame: OutputImageRawFrame = OutputImageRawFrame(
|
||||
image=video_frame.to_rgb().to_image().tobytes(),
|
||||
size=(video_frame.width, video_frame.height),
|
||||
format="RGB",
|
||||
)
|
||||
convertedFrame.pts = video_frame.pts
|
||||
await self.push_frame(convertedFrame)
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} exception: {e}")
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
await self._pipecat_resampler_event.wait()
|
||||
async for video_frame in self._simli_client.getVideoStreamIterator(targetFormat="rgb24"):
|
||||
# Process the video frame
|
||||
convertedFrame: OutputImageRawFrame = OutputImageRawFrame(
|
||||
image=video_frame.to_rgb().to_image().tobytes(),
|
||||
size=(video_frame.width, video_frame.height),
|
||||
format="RGB",
|
||||
)
|
||||
convertedFrame.pts = video_frame.pts
|
||||
await self.push_frame(convertedFrame)
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
@@ -128,8 +116,6 @@ class SimliVideoService(FrameProcessor):
|
||||
async def _stop(self):
|
||||
await self._simli_client.stop()
|
||||
if self._audio_task:
|
||||
self._audio_task.cancel()
|
||||
await self._audio_task
|
||||
await self.cancel_task(self._audio_task)
|
||||
if self._video_task:
|
||||
self._video_task.cancel()
|
||||
await self._video_task
|
||||
await self.cancel_task(self._video_task)
|
||||
|
||||
@@ -75,6 +75,14 @@ class TavusVideoService(AIService):
|
||||
logger.debug(f"TavusVideoService persona grabbed {response_json}")
|
||||
return response_json["persona_name"]
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._end_conversation()
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._end_conversation()
|
||||
|
||||
async def _end_conversation(self) -> None:
|
||||
url = f"https://tavusapi.com/v2/conversations/{self._conversation_id}/end"
|
||||
headers = {"Content-Type": "application/json", "x-api-key": self._api_key}
|
||||
@@ -105,8 +113,6 @@ class TavusVideoService(AIService):
|
||||
await self.stop_processing_metrics()
|
||||
elif isinstance(frame, StartInterruptionFrame):
|
||||
await self._send_interrupt_message()
|
||||
elif isinstance(frame, (EndFrame, CancelFrame)):
|
||||
await self._end_conversation()
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
|
||||
@@ -85,10 +85,6 @@ class WebsocketService(ABC):
|
||||
await self._receive_messages()
|
||||
logger.debug(f"{self} connection established successfully")
|
||||
retry_count = 0 # Reset counter on successful message receive
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
if retry_count >= MAX_RETRIES:
|
||||
|
||||
@@ -50,13 +50,12 @@ class BaseInputTransport(FrameProcessor):
|
||||
# Create audio input queue and task if needed.
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
self._audio_in_queue = asyncio.Queue()
|
||||
self._audio_task = self.get_event_loop().create_task(self._audio_task_handler())
|
||||
self._audio_task = self.create_task(self._audio_task_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
# Cancel and wait for the audio input task to finish.
|
||||
if self._audio_task and (self._params.audio_in_enabled or self._params.vad_enabled):
|
||||
self._audio_task.cancel()
|
||||
await self._audio_task
|
||||
await self.cancel_task(self._audio_task)
|
||||
self._audio_task = None
|
||||
# Stop audio filter.
|
||||
if self._params.audio_in_filter:
|
||||
@@ -65,8 +64,7 @@ class BaseInputTransport(FrameProcessor):
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
# Cancel and wait for the audio input task to finish.
|
||||
if self._audio_task and (self._params.audio_in_enabled or self._params.vad_enabled):
|
||||
self._audio_task.cancel()
|
||||
await self._audio_task
|
||||
await self.cancel_task(self._audio_task)
|
||||
self._audio_task = None
|
||||
|
||||
def vad_analyzer(self) -> VADAnalyzer | None:
|
||||
@@ -173,27 +171,22 @@ class BaseInputTransport(FrameProcessor):
|
||||
async def _audio_task_handler(self):
|
||||
vad_state: VADState = VADState.QUIET
|
||||
while True:
|
||||
try:
|
||||
frame: InputAudioRawFrame = await self._audio_in_queue.get()
|
||||
frame: InputAudioRawFrame = await self._audio_in_queue.get()
|
||||
|
||||
audio_passthrough = True
|
||||
audio_passthrough = True
|
||||
|
||||
# If an audio filter is available, run it before VAD.
|
||||
if self._params.audio_in_filter:
|
||||
frame.audio = await self._params.audio_in_filter.filter(frame.audio)
|
||||
# If an audio filter is available, run it before VAD.
|
||||
if self._params.audio_in_filter:
|
||||
frame.audio = await self._params.audio_in_filter.filter(frame.audio)
|
||||
|
||||
# Check VAD and push event if necessary. We just care about
|
||||
# changes from QUIET to SPEAKING and vice versa.
|
||||
if self._params.vad_enabled:
|
||||
vad_state = await self._handle_vad(frame, vad_state)
|
||||
audio_passthrough = self._params.vad_audio_passthrough
|
||||
# Check VAD and push event if necessary. We just care about
|
||||
# changes from QUIET to SPEAKING and vice versa.
|
||||
if self._params.vad_enabled:
|
||||
vad_state = await self._handle_vad(frame, vad_state)
|
||||
audio_passthrough = self._params.vad_audio_passthrough
|
||||
|
||||
# Push audio downstream if passthrough.
|
||||
if audio_passthrough:
|
||||
await self.push_frame(frame)
|
||||
# Push audio downstream if passthrough.
|
||||
if audio_passthrough:
|
||||
await self.push_frame(frame)
|
||||
|
||||
self._audio_in_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error reading audio frames: {e}")
|
||||
self._audio_in_queue.task_done()
|
||||
|
||||
@@ -35,6 +35,7 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.transports.base_transport import TransportParams
|
||||
from pipecat.utils.asyncio import wait_for_task
|
||||
from pipecat.utils.time import nanoseconds_to_seconds
|
||||
|
||||
|
||||
@@ -87,9 +88,9 @@ class BaseOutputTransport(FrameProcessor):
|
||||
# for these tasks before cancelling the camera and audio tasks below
|
||||
# because they might be still rendering.
|
||||
if self._sink_task:
|
||||
await self._sink_task
|
||||
await wait_for_task(self._sink_task)
|
||||
if self._sink_clock_task:
|
||||
await self._sink_clock_task
|
||||
await wait_for_task(self._sink_clock_task)
|
||||
|
||||
# We can now cancel the camera task.
|
||||
await self._cancel_camera_task()
|
||||
@@ -217,22 +218,19 @@ class BaseOutputTransport(FrameProcessor):
|
||||
#
|
||||
|
||||
def _create_sink_tasks(self):
|
||||
loop = self.get_event_loop()
|
||||
self._sink_queue = asyncio.Queue()
|
||||
self._sink_task = loop.create_task(self._sink_task_handler())
|
||||
self._sink_clock_queue = asyncio.PriorityQueue()
|
||||
self._sink_clock_task = loop.create_task(self._sink_clock_task_handler())
|
||||
self._sink_task = self.create_task(self._sink_task_handler())
|
||||
self._sink_clock_task = self.create_task(self._sink_clock_task_handler())
|
||||
|
||||
async def _cancel_sink_tasks(self):
|
||||
# Stop sink tasks.
|
||||
if self._sink_task:
|
||||
self._sink_task.cancel()
|
||||
await self._sink_task
|
||||
await self.cancel_task(self._sink_task)
|
||||
self._sink_task = None
|
||||
# Stop sink clock tasks.
|
||||
if self._sink_clock_task:
|
||||
self._sink_clock_task.cancel()
|
||||
await self._sink_clock_task
|
||||
await self.cancel_task(self._sink_clock_task)
|
||||
self._sink_clock_task = None
|
||||
|
||||
async def _sink_frame_handler(self, frame: Frame):
|
||||
@@ -269,7 +267,7 @@ class BaseOutputTransport(FrameProcessor):
|
||||
|
||||
self._sink_clock_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error processing sink clock queue: {e}")
|
||||
|
||||
@@ -317,49 +315,42 @@ class BaseOutputTransport(FrameProcessor):
|
||||
return without_mixer(vad_stop_secs)
|
||||
|
||||
async def _sink_task_handler(self):
|
||||
try:
|
||||
async for frame in self._next_frame():
|
||||
# Notify the bot started speaking upstream if necessary and that
|
||||
# it's actually speaking.
|
||||
if isinstance(frame, TTSAudioRawFrame):
|
||||
await self._bot_started_speaking()
|
||||
await self.push_frame(BotSpeakingFrame())
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
async for frame in self._next_frame():
|
||||
# Notify the bot started speaking upstream if necessary and that
|
||||
# it's actually speaking.
|
||||
if isinstance(frame, TTSAudioRawFrame):
|
||||
await self._bot_started_speaking()
|
||||
await self.push_frame(BotSpeakingFrame())
|
||||
await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
|
||||
|
||||
# No need to push EndFrame, it's pushed from process_frame().
|
||||
if isinstance(frame, EndFrame):
|
||||
break
|
||||
# No need to push EndFrame, it's pushed from process_frame().
|
||||
if isinstance(frame, EndFrame):
|
||||
break
|
||||
|
||||
# Handle frame.
|
||||
await self._sink_frame_handler(frame)
|
||||
# Handle frame.
|
||||
await self._sink_frame_handler(frame)
|
||||
|
||||
# Also, push frame downstream in case anyone else needs it.
|
||||
await self.push_frame(frame)
|
||||
# Also, push frame downstream in case anyone else needs it.
|
||||
await self.push_frame(frame)
|
||||
|
||||
# Send audio.
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error writing to microphone: {e}")
|
||||
# Send audio.
|
||||
if isinstance(frame, OutputAudioRawFrame):
|
||||
await self.write_raw_audio_frames(frame.audio)
|
||||
|
||||
#
|
||||
# Camera task
|
||||
#
|
||||
|
||||
def _create_camera_task(self):
|
||||
loop = self.get_event_loop()
|
||||
# Create camera output queue and task if needed.
|
||||
if self._params.camera_out_enabled:
|
||||
self._camera_out_queue = asyncio.Queue()
|
||||
self._camera_out_task = loop.create_task(self._camera_out_task_handler())
|
||||
self._camera_out_task = self.create_task(self._camera_out_task_handler())
|
||||
|
||||
async def _cancel_camera_task(self):
|
||||
# Stop camera output task.
|
||||
if self._camera_out_task and self._params.camera_out_enabled:
|
||||
self._camera_out_task.cancel()
|
||||
await self._camera_out_task
|
||||
await self.cancel_task(self._camera_out_task)
|
||||
self._camera_out_task = None
|
||||
|
||||
async def _draw_image(self, frame: OutputImageRawFrame):
|
||||
@@ -387,19 +378,14 @@ class BaseOutputTransport(FrameProcessor):
|
||||
self._camera_out_frame_duration = 1 / self._params.camera_out_framerate
|
||||
self._camera_out_frame_reset = self._camera_out_frame_duration * 5
|
||||
while True:
|
||||
try:
|
||||
if self._params.camera_out_is_live:
|
||||
await self._camera_out_is_live_handler()
|
||||
elif self._camera_images:
|
||||
image = next(self._camera_images)
|
||||
await self._draw_image(image)
|
||||
await asyncio.sleep(self._camera_out_frame_duration)
|
||||
else:
|
||||
await asyncio.sleep(self._camera_out_frame_duration)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception(f"{self} error writing to camera: {e}")
|
||||
if self._params.camera_out_is_live:
|
||||
await self._camera_out_is_live_handler()
|
||||
elif self._camera_images:
|
||||
image = next(self._camera_images)
|
||||
await self._draw_image(image)
|
||||
await asyncio.sleep(self._camera_out_frame_duration)
|
||||
else:
|
||||
await asyncio.sleep(self._camera_out_frame_duration)
|
||||
|
||||
async def _camera_out_is_live_handler(self):
|
||||
image = await self._camera_out_queue.get()
|
||||
|
||||
@@ -16,6 +16,7 @@ from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
|
||||
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
|
||||
|
||||
class TransportParams(BaseModel):
|
||||
@@ -46,15 +47,27 @@ class TransportParams(BaseModel):
|
||||
class BaseTransport(ABC):
|
||||
def __init__(
|
||||
self,
|
||||
input_name: str | None = None,
|
||||
output_name: str | None = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
*,
|
||||
name: Optional[str] = None,
|
||||
input_name: Optional[str] = None,
|
||||
output_name: Optional[str] = None,
|
||||
loop: Optional[asyncio.AbstractEventLoop] = None,
|
||||
):
|
||||
self._id: int = obj_id()
|
||||
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self._input_name = input_name
|
||||
self._output_name = output_name
|
||||
self._loop = loop or asyncio.get_running_loop()
|
||||
self._event_handlers: dict = {}
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._name
|
||||
|
||||
@abstractmethod
|
||||
def input(self) -> FrameProcessor:
|
||||
raise NotImplementedError
|
||||
@@ -89,3 +102,6 @@ class BaseTransport(ABC):
|
||||
handler(self, *args, **kwargs)
|
||||
except Exception as e:
|
||||
logger.exception(f"Exception in event handler {event_name}: {e}")
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
@@ -98,6 +98,7 @@ class LocalAudioOutputTransport(BaseOutputTransport):
|
||||
|
||||
class LocalAudioTransport(BaseTransport):
|
||||
def __init__(self, params: TransportParams):
|
||||
super().__init__()
|
||||
self._params = params
|
||||
self._pyaudio = pyaudio.PyAudio()
|
||||
|
||||
|
||||
@@ -127,6 +127,7 @@ class TkOutputTransport(BaseOutputTransport):
|
||||
|
||||
class TkLocalTransport(BaseTransport):
|
||||
def __init__(self, tk_root: tk.Tk, params: TransportParams):
|
||||
super().__init__()
|
||||
self._tk_root = tk_root
|
||||
self._params = params
|
||||
self._pyaudio = pyaudio.PyAudio()
|
||||
|
||||
@@ -16,6 +16,8 @@ from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
Frame,
|
||||
InputAudioRawFrame,
|
||||
OutputAudioRawFrame,
|
||||
@@ -27,6 +29,7 @@ from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializer
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.utils.asyncio import cancel_task
|
||||
|
||||
try:
|
||||
from fastapi import WebSocket
|
||||
@@ -68,11 +71,17 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
if self._params.session_timeout:
|
||||
self._monitor_websocket_task = self.get_event_loop().create_task(
|
||||
self._monitor_websocket()
|
||||
)
|
||||
self._monitor_websocket_task = self.create_task(self._monitor_websocket())
|
||||
await self._callbacks.on_client_connected(self._websocket)
|
||||
self._receive_task = self.get_event_loop().create_task(self._receive_messages())
|
||||
self._receive_task = self.create_task(self._receive_messages())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await cancel_task(self._receive_task)
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await cancel_task(self._receive_task)
|
||||
|
||||
def _iter_data(self) -> typing.AsyncIterator[bytes | str]:
|
||||
if self._params.serializer.type == FrameSerializerType.BINARY:
|
||||
@@ -96,11 +105,8 @@ class FastAPIWebsocketInputTransport(BaseInputTransport):
|
||||
|
||||
async def _monitor_websocket(self):
|
||||
"""Wait for self._params.session_timeout seconds, if the websocket is still open, trigger timeout event."""
|
||||
try:
|
||||
await asyncio.sleep(self._params.session_timeout)
|
||||
await self._callbacks.on_session_timeout(self._websocket)
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"Monitoring task cancelled for: {self._websocket}")
|
||||
await asyncio.sleep(self._params.session_timeout)
|
||||
await self._callbacks.on_session_timeout(self._websocket)
|
||||
|
||||
|
||||
class FastAPIWebsocketOutputTransport(BaseOutputTransport):
|
||||
|
||||
@@ -71,17 +71,16 @@ class WebsocketServerInputTransport(BaseInputTransport):
|
||||
|
||||
async def start(self, frame: StartFrame):
|
||||
await super().start(frame)
|
||||
self._server_task = self.get_event_loop().create_task(self._server_task_handler())
|
||||
self._server_task = self.create_task(self._server_task_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
self._stop_server_event.set()
|
||||
await self._server_task
|
||||
await self.wait_for_task(self._server_task)
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
self._stop_server_event.set()
|
||||
await self._server_task
|
||||
await self.cancel_task(self._server_task)
|
||||
|
||||
async def _server_task_handler(self):
|
||||
logger.info(f"Starting websocket server on {self._host}:{self._port}")
|
||||
@@ -131,6 +130,7 @@ class WebsocketServerInputTransport(BaseInputTransport):
|
||||
await self._callbacks.on_session_timeout(websocket)
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"Monitoring task cancelled for: {websocket.remote_address}")
|
||||
raise
|
||||
|
||||
|
||||
class WebsocketServerOutputTransport(BaseOutputTransport):
|
||||
|
||||
@@ -46,6 +46,7 @@ from pipecat.transcriptions.language import Language
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.utils.asyncio import cancel_task, create_task
|
||||
|
||||
try:
|
||||
from daily import CallClient, Daily, EventHandler
|
||||
@@ -180,6 +181,7 @@ class DailyTransportClient(EventHandler):
|
||||
params: DailyParams,
|
||||
callbacks: DailyCallbacks,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
transport_name: str,
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
@@ -193,6 +195,7 @@ class DailyTransportClient(EventHandler):
|
||||
self._params: DailyParams = params
|
||||
self._callbacks = callbacks
|
||||
self._loop = loop
|
||||
self._transport_name = transport_name
|
||||
|
||||
self._participant_id: str = ""
|
||||
self._video_renderers = {}
|
||||
@@ -218,7 +221,11 @@ class DailyTransportClient(EventHandler):
|
||||
# future) we will deadlock because completions use event handlers (which
|
||||
# are holding the GIL).
|
||||
self._callback_queue = asyncio.Queue()
|
||||
self._callback_task = self._loop.create_task(self._callback_task_handler())
|
||||
self._callback_task = create_task(
|
||||
self._loop,
|
||||
self._callback_task_handler(),
|
||||
f"{self._transport_name}::DailyTransportClient::callback_task",
|
||||
)
|
||||
|
||||
self._camera: VirtualCameraDevice | None = None
|
||||
if self._params.camera_out_enabled:
|
||||
@@ -469,8 +476,9 @@ class DailyTransportClient(EventHandler):
|
||||
return await asyncio.wait_for(future, timeout=10)
|
||||
|
||||
async def cleanup(self):
|
||||
self._callback_task.cancel()
|
||||
await self._callback_task
|
||||
if self._callback_task:
|
||||
await cancel_task(self._callback_task)
|
||||
self._callback_task = None
|
||||
# Make sure we don't block the event loop in case `client.release()`
|
||||
# takes extra time.
|
||||
await self._loop.run_in_executor(self._executor, self._cleanup)
|
||||
@@ -687,11 +695,8 @@ class DailyTransportClient(EventHandler):
|
||||
|
||||
async def _callback_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
(callback, *args) = await self._callback_queue.get()
|
||||
await callback(*args)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
(callback, *args) = await self._callback_queue.get()
|
||||
await callback(*args)
|
||||
|
||||
|
||||
class DailyInputTransport(BaseInputTransport):
|
||||
@@ -721,7 +726,7 @@ class DailyInputTransport(BaseInputTransport):
|
||||
# Create audio task. It reads audio frames from Daily and push them
|
||||
# internally for VAD processing.
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
self._audio_in_task = self.get_event_loop().create_task(self._audio_in_task_handler())
|
||||
self._audio_in_task = self.create_task(self._audio_in_task_handler())
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
# Parent stop.
|
||||
@@ -730,8 +735,7 @@ class DailyInputTransport(BaseInputTransport):
|
||||
await self._client.leave()
|
||||
# Stop audio thread.
|
||||
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
|
||||
self._audio_in_task.cancel()
|
||||
await self._audio_in_task
|
||||
await self.cancel_task(self._audio_in_task)
|
||||
self._audio_in_task = None
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
@@ -741,8 +745,7 @@ class DailyInputTransport(BaseInputTransport):
|
||||
await self._client.leave()
|
||||
# Stop audio thread.
|
||||
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
|
||||
self._audio_in_task.cancel()
|
||||
await self._audio_in_task
|
||||
await self.cancel_task(self._audio_in_task)
|
||||
self._audio_in_task = None
|
||||
|
||||
async def cleanup(self):
|
||||
@@ -779,12 +782,9 @@ class DailyInputTransport(BaseInputTransport):
|
||||
|
||||
async def _audio_in_task_handler(self):
|
||||
while True:
|
||||
try:
|
||||
frame = await self._client.read_next_audio_frame()
|
||||
if frame:
|
||||
await self.push_audio_frame(frame)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
frame = await self._client.read_next_audio_frame()
|
||||
if frame:
|
||||
await self.push_audio_frame(frame)
|
||||
|
||||
#
|
||||
# Camera in
|
||||
@@ -913,7 +913,7 @@ class DailyTransport(BaseTransport):
|
||||
self._params = params
|
||||
|
||||
self._client = DailyTransportClient(
|
||||
room_url, token, bot_name, params, callbacks, self._loop
|
||||
room_url, token, bot_name, params, callbacks, self._loop, self.name
|
||||
)
|
||||
self._input: DailyInputTransport | None = None
|
||||
self._output: DailyOutputTransport | None = None
|
||||
|
||||
@@ -33,6 +33,19 @@ class DailyRoomSipParams(BaseModel):
|
||||
num_endpoints: int = 1
|
||||
|
||||
|
||||
class RecordingsBucketConfig(BaseModel):
|
||||
"""Configuration for storing Daily recordings in a custom S3 bucket.
|
||||
|
||||
Refer to the Daily API documentation for more information:
|
||||
https://docs.daily.co/guides/products/live-streaming-recording/storing-recordings-in-a-custom-s3-bucket
|
||||
"""
|
||||
|
||||
bucket_name: str
|
||||
bucket_region: str
|
||||
assume_role_arn: str
|
||||
allow_api_access: bool = False
|
||||
|
||||
|
||||
class DailyRoomProperties(BaseModel, extra="allow"):
|
||||
"""Properties for configuring a Daily room.
|
||||
|
||||
@@ -43,6 +56,8 @@ class DailyRoomProperties(BaseModel, extra="allow"):
|
||||
enable_emoji_reactions: Whether emoji reactions are enabled
|
||||
eject_at_room_exp: Whether to remove participants when room expires
|
||||
enable_dialout: Whether SIP dial-out is enabled
|
||||
enable_recording: Recording settings ('cloud', 'local', 'raw-tracks')
|
||||
geo: Geographic region for room
|
||||
max_participants: Maximum number of participants allowed in the room
|
||||
sip: SIP configuration parameters
|
||||
sip_uri: SIP URI information returned by Daily
|
||||
@@ -57,7 +72,10 @@ class DailyRoomProperties(BaseModel, extra="allow"):
|
||||
enable_emoji_reactions: bool = False
|
||||
eject_at_room_exp: bool = True
|
||||
enable_dialout: Optional[bool] = None
|
||||
enable_recording: Optional[Literal["cloud", "local", "raw-tracks"]] = None
|
||||
geo: Optional[str] = None
|
||||
max_participants: Optional[int] = None
|
||||
recordings_bucket: Optional[RecordingsBucketConfig] = None
|
||||
sip: Optional[DailyRoomSipParams] = None
|
||||
sip_uri: Optional[dict] = None
|
||||
start_video_off: bool = False
|
||||
@@ -111,6 +129,84 @@ class DailyRoomObject(BaseModel):
|
||||
config: DailyRoomProperties
|
||||
|
||||
|
||||
class DailyMeetingTokenProperties(BaseModel):
|
||||
"""Properties for configuring a Daily meeting token.
|
||||
|
||||
Refer to the Daily API documentation for more information:
|
||||
https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token#properties
|
||||
"""
|
||||
|
||||
room_name: Optional[str] = Field(
|
||||
default=None,
|
||||
description="The room for which this token is valid. If not set, the token is valid for all rooms in your domain. You should always set room_name if using this token to control meeting access.",
|
||||
)
|
||||
|
||||
eject_at_token_exp: Optional[bool] = Field(
|
||||
default=None,
|
||||
description="If `true`, the user will be ejected from the room when the token expires. Defaults to `false`.",
|
||||
)
|
||||
eject_after_elapsed: Optional[int] = Field(
|
||||
default=None,
|
||||
description="The number of seconds after which the user will be ejected from the room. If not provided, the user will not be ejected based on elapsed time.",
|
||||
)
|
||||
|
||||
nbf: Optional[int] = Field(
|
||||
default=None,
|
||||
description="Not before. This is a unix timestamp (seconds since the epoch.) Users cannot join a meeting in with this token before this time.",
|
||||
)
|
||||
|
||||
exp: Optional[int] = Field(
|
||||
default=None,
|
||||
description="Expiration time (unix timestamp in seconds). We strongly recommend setting this value for security. If not set, the token will not expire. Refer docs for more info.",
|
||||
)
|
||||
is_owner: Optional[bool] = Field(
|
||||
default=None,
|
||||
description="If `true`, the token will grant owner privileges in the room. Defaults to `false`.",
|
||||
)
|
||||
user_name: Optional[str] = Field(
|
||||
default=None,
|
||||
description="The name of the user. This will be added to the token payload.",
|
||||
)
|
||||
user_id: Optional[str] = Field(
|
||||
default=None,
|
||||
description="A unique identifier for the user. This will be added to the token payload.",
|
||||
)
|
||||
enable_screenshare: Optional[bool] = Field(
|
||||
default=None,
|
||||
description="If `true`, the user will be able to share their screen. Defaults to `true`.",
|
||||
)
|
||||
start_video_off: Optional[bool] = Field(
|
||||
default=None,
|
||||
description="If `true`, the user's video will be turned off when they join the room. Defaults to `false`.",
|
||||
)
|
||||
start_audio_off: Optional[bool] = Field(
|
||||
default=None,
|
||||
description="If `true`, the user's audio will be turned off when they join the room. Defaults to `false`.",
|
||||
)
|
||||
enable_recording: Optional[Literal["cloud", "local", "raw-tracks"]] = Field(
|
||||
default=None,
|
||||
description="Recording settings for the token. Must be one of `cloud`, `local` or `raw-tracks`.",
|
||||
)
|
||||
enable_prejoin_ui: Optional[bool] = Field(
|
||||
default=None,
|
||||
description="If `true`, the user will see the prejoin UI before joining the room.",
|
||||
)
|
||||
start_cloud_recording: Optional[bool] = Field(
|
||||
default=None,
|
||||
description="Start cloud recording when the user joins the room. This can be used to always record and archive meetings, for example in a customer support context.",
|
||||
)
|
||||
|
||||
|
||||
class DailyMeetingTokenParams(BaseModel):
|
||||
"""Parameters for creating a Daily meeting token.
|
||||
|
||||
Refer to the Daily API documentation for more information:
|
||||
https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token#body-params
|
||||
"""
|
||||
|
||||
properties: DailyMeetingTokenProperties = Field(default_factory=DailyMeetingTokenProperties)
|
||||
|
||||
|
||||
class DailyRESTHelper:
|
||||
"""Helper class for interacting with Daily's REST API.
|
||||
|
||||
@@ -129,6 +225,7 @@ class DailyRESTHelper:
|
||||
daily_api_url: str = "https://api.daily.co/v1",
|
||||
aiohttp_session: aiohttp.ClientSession,
|
||||
):
|
||||
"""Initialize the Daily REST helper."""
|
||||
self.daily_api_key = daily_api_key
|
||||
self.daily_api_url = daily_api_url
|
||||
self.aiohttp_session = aiohttp_session
|
||||
@@ -169,7 +266,7 @@ class DailyRESTHelper:
|
||||
Exception: If room creation fails or response is invalid
|
||||
"""
|
||||
headers = {"Authorization": f"Bearer {self.daily_api_key}"}
|
||||
json = {**params.model_dump(exclude_none=True)}
|
||||
json = params.model_dump(exclude_none=True)
|
||||
async with self.aiohttp_session.post(
|
||||
f"{self.daily_api_url}/rooms", headers=headers, json=json
|
||||
) as r:
|
||||
@@ -187,7 +284,11 @@ class DailyRESTHelper:
|
||||
return room
|
||||
|
||||
async def get_token(
|
||||
self, room_url: str, expiry_time: float = 60 * 60, owner: bool = True
|
||||
self,
|
||||
room_url: str,
|
||||
expiry_time: float = 60 * 60,
|
||||
owner: bool = True,
|
||||
params: Optional[DailyMeetingTokenParams] = None,
|
||||
) -> str:
|
||||
"""Generate a meeting token for user to join a Daily room.
|
||||
|
||||
@@ -195,6 +296,9 @@ class DailyRESTHelper:
|
||||
room_url: Daily room URL
|
||||
expiry_time: Token validity duration in seconds (default: 1 hour)
|
||||
owner: Whether token has owner privileges
|
||||
params: Optional additional token properties. Note that room_name,
|
||||
exp, and is_owner will be set based on the other function
|
||||
parameters regardless of values in params.
|
||||
|
||||
Returns:
|
||||
str: Meeting token
|
||||
@@ -207,12 +311,25 @@ class DailyRESTHelper:
|
||||
"No Daily room specified. You must specify a Daily room in order a token to be generated."
|
||||
)
|
||||
|
||||
expiration: float = time.time() + expiry_time
|
||||
expiration: int = int(time.time() + expiry_time)
|
||||
|
||||
room_name = self.get_name_from_url(room_url)
|
||||
|
||||
headers = {"Authorization": f"Bearer {self.daily_api_key}"}
|
||||
json = {"properties": {"room_name": room_name, "is_owner": owner, "exp": expiration}}
|
||||
|
||||
if params is None:
|
||||
params = DailyMeetingTokenParams(
|
||||
properties=DailyMeetingTokenProperties(
|
||||
room_name=room_name, is_owner=owner, exp=expiration
|
||||
)
|
||||
)
|
||||
else:
|
||||
params.properties.room_name = room_name
|
||||
params.properties.exp = expiration
|
||||
params.properties.is_owner = owner
|
||||
|
||||
json = params.model_dump(exclude_none=True)
|
||||
|
||||
async with self.aiohttp_session.post(
|
||||
f"{self.daily_api_url}/meeting-tokens", headers=headers, json=json
|
||||
) as r:
|
||||
|
||||
@@ -28,6 +28,7 @@ from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.transports.base_input import BaseInputTransport
|
||||
from pipecat.transports.base_output import BaseOutputTransport
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.utils.asyncio import create_task
|
||||
|
||||
try:
|
||||
from livekit import rtc
|
||||
@@ -72,6 +73,7 @@ class LiveKitTransportClient:
|
||||
params: LiveKitParams,
|
||||
callbacks: LiveKitCallbacks,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
transport_name: str,
|
||||
):
|
||||
self._url = url
|
||||
self._token = token
|
||||
@@ -79,6 +81,7 @@ class LiveKitTransportClient:
|
||||
self._params = params
|
||||
self._callbacks = callbacks
|
||||
self._loop = loop
|
||||
self._transport_name = transport_name
|
||||
self._room = rtc.Room(loop=loop)
|
||||
self._participant_id: str = ""
|
||||
self._connected = False
|
||||
@@ -215,10 +218,18 @@ class LiveKitTransportClient:
|
||||
|
||||
# Wrapper methods for event handlers
|
||||
def _on_participant_connected_wrapper(self, participant: rtc.RemoteParticipant):
|
||||
asyncio.create_task(self._async_on_participant_connected(participant))
|
||||
create_task(
|
||||
self._loop,
|
||||
self._async_on_participant_connected(participant),
|
||||
f"{self._transport_name}::LiveKitTransportClient::_async_on_participant_connected",
|
||||
)
|
||||
|
||||
def _on_participant_disconnected_wrapper(self, participant: rtc.RemoteParticipant):
|
||||
asyncio.create_task(self._async_on_participant_disconnected(participant))
|
||||
create_task(
|
||||
self._loop,
|
||||
self._async_on_participant_disconnected(participant),
|
||||
f"{self._transport_name}::LiveKitTransportClient::_async_on_participant_disconnected",
|
||||
)
|
||||
|
||||
def _on_track_subscribed_wrapper(
|
||||
self,
|
||||
@@ -226,7 +237,11 @@ class LiveKitTransportClient:
|
||||
publication: rtc.RemoteTrackPublication,
|
||||
participant: rtc.RemoteParticipant,
|
||||
):
|
||||
asyncio.create_task(self._async_on_track_subscribed(track, publication, participant))
|
||||
create_task(
|
||||
self._loop,
|
||||
self._async_on_track_subscribed(track, publication, participant),
|
||||
f"{self._transport_name}::LiveKitTransportClient::_async_on_track_subscribed",
|
||||
)
|
||||
|
||||
def _on_track_unsubscribed_wrapper(
|
||||
self,
|
||||
@@ -234,16 +249,32 @@ class LiveKitTransportClient:
|
||||
publication: rtc.RemoteTrackPublication,
|
||||
participant: rtc.RemoteParticipant,
|
||||
):
|
||||
asyncio.create_task(self._async_on_track_unsubscribed(track, publication, participant))
|
||||
create_task(
|
||||
self._loop,
|
||||
self._async_on_track_unsubscribed(track, publication, participant),
|
||||
f"{self._transport_name}::LiveKitTransportClient::_async_on_track_unsubscribed",
|
||||
)
|
||||
|
||||
def _on_data_received_wrapper(self, data: rtc.DataPacket):
|
||||
asyncio.create_task(self._async_on_data_received(data))
|
||||
create_task(
|
||||
self._loop,
|
||||
self._async_on_data_received(data),
|
||||
f"{self._transport_name}::LiveKitTransportClient::_async_on_data_received",
|
||||
)
|
||||
|
||||
def _on_connected_wrapper(self):
|
||||
asyncio.create_task(self._async_on_connected())
|
||||
create_task(
|
||||
self._loop,
|
||||
self._async_on_connected(),
|
||||
f"{self._transport_name}::LiveKitTransportClient::_async_on_connected",
|
||||
)
|
||||
|
||||
def _on_disconnected_wrapper(self):
|
||||
asyncio.create_task(self._async_on_disconnected())
|
||||
create_task(
|
||||
self._loop,
|
||||
self._async_on_disconnected(),
|
||||
f"{self._transport_name}::LiveKitTransportClient::_async_on_disconnected",
|
||||
)
|
||||
|
||||
# Async methods for event handling
|
||||
async def _async_on_participant_connected(self, participant: rtc.RemoteParticipant):
|
||||
@@ -269,7 +300,11 @@ class LiveKitTransportClient:
|
||||
logger.info(f"Audio track subscribed: {track.sid} from participant {participant.sid}")
|
||||
self._audio_tracks[participant.sid] = track
|
||||
audio_stream = rtc.AudioStream(track)
|
||||
asyncio.create_task(self._process_audio_stream(audio_stream, participant.sid))
|
||||
create_task(
|
||||
self._loop,
|
||||
self._process_audio_stream(audio_stream, participant.sid),
|
||||
f"{self._transport_name}::LiveKitTransportClient::_process_audio_stream",
|
||||
)
|
||||
|
||||
async def _async_on_track_unsubscribed(
|
||||
self,
|
||||
@@ -319,23 +354,21 @@ class LiveKitInputTransport(BaseInputTransport):
|
||||
await super().start(frame)
|
||||
await self._client.connect()
|
||||
if self._params.audio_in_enabled or self._params.vad_enabled:
|
||||
self._audio_in_task = asyncio.create_task(self._audio_in_task_handler())
|
||||
self._audio_in_task = self.create_task(self._audio_in_task_handler())
|
||||
logger.info("LiveKitInputTransport started")
|
||||
|
||||
async def stop(self, frame: EndFrame):
|
||||
await super().stop(frame)
|
||||
await self._client.disconnect()
|
||||
if self._audio_in_task:
|
||||
self._audio_in_task.cancel()
|
||||
await self._audio_in_task
|
||||
await self.cancel_task(self._audio_in_task)
|
||||
logger.info("LiveKitInputTransport stopped")
|
||||
|
||||
async def cancel(self, frame: CancelFrame):
|
||||
await super().cancel(frame)
|
||||
await self._client.disconnect()
|
||||
if self._audio_in_task and (self._params.audio_in_enabled or self._params.vad_enabled):
|
||||
self._audio_in_task.cancel()
|
||||
await self._audio_in_task
|
||||
await self.cancel_task(self._audio_in_task)
|
||||
|
||||
def vad_analyzer(self) -> VADAnalyzer | None:
|
||||
return self._vad_analyzer
|
||||
@@ -347,22 +380,16 @@ class LiveKitInputTransport(BaseInputTransport):
|
||||
async def _audio_in_task_handler(self):
|
||||
logger.info("Audio input task started")
|
||||
while True:
|
||||
try:
|
||||
audio_data = await self._client.get_next_audio_frame()
|
||||
if audio_data:
|
||||
audio_frame_event, participant_id = audio_data
|
||||
pipecat_audio_frame = self._convert_livekit_audio_to_pipecat(audio_frame_event)
|
||||
input_audio_frame = InputAudioRawFrame(
|
||||
audio=pipecat_audio_frame.audio,
|
||||
sample_rate=pipecat_audio_frame.sample_rate,
|
||||
num_channels=pipecat_audio_frame.num_channels,
|
||||
)
|
||||
await self.push_audio_frame(input_audio_frame)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Audio input task cancelled")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error in audio input task: {e}")
|
||||
audio_data = await self._client.get_next_audio_frame()
|
||||
if audio_data:
|
||||
audio_frame_event, participant_id = audio_data
|
||||
pipecat_audio_frame = self._convert_livekit_audio_to_pipecat(audio_frame_event)
|
||||
input_audio_frame = InputAudioRawFrame(
|
||||
audio=pipecat_audio_frame.audio,
|
||||
sample_rate=pipecat_audio_frame.sample_rate,
|
||||
num_channels=pipecat_audio_frame.num_channels,
|
||||
)
|
||||
await self.push_audio_frame(input_audio_frame)
|
||||
|
||||
def _convert_livekit_audio_to_pipecat(
|
||||
self, audio_frame_event: rtc.AudioFrameEvent
|
||||
@@ -451,7 +478,7 @@ class LiveKitTransport(BaseTransport):
|
||||
self._params = params
|
||||
|
||||
self._client = LiveKitTransportClient(
|
||||
url, token, room_name, self._params, callbacks, self._loop
|
||||
url, token, room_name, self._params, callbacks, self._loop, self.name
|
||||
)
|
||||
self._input: LiveKitInputTransport | None = None
|
||||
self._output: LiveKitOutputTransport | None = None
|
||||
|
||||
114
src/pipecat/utils/asyncio.py
Normal file
114
src/pipecat/utils/asyncio.py
Normal file
@@ -0,0 +1,114 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
from typing import Coroutine, Optional, Set
|
||||
|
||||
from loguru import logger
|
||||
|
||||
_TASKS: Set[asyncio.Task] = set()
|
||||
|
||||
|
||||
def create_task(loop: asyncio.AbstractEventLoop, coroutine: Coroutine, name: str) -> asyncio.Task:
|
||||
"""
|
||||
Creates and schedules a new asyncio Task that runs the given coroutine.
|
||||
|
||||
The task is added to a global set of created tasks.
|
||||
|
||||
Args:
|
||||
loop (asyncio.AbstractEventLoop): The event loop to use for creating the task.
|
||||
coroutine (Coroutine): The coroutine to be executed within the task.
|
||||
name (str): The name to assign to the task for identification.
|
||||
|
||||
Returns:
|
||||
asyncio.Task: The created task object.
|
||||
"""
|
||||
|
||||
async def run_coroutine():
|
||||
try:
|
||||
await coroutine
|
||||
except asyncio.CancelledError:
|
||||
logger.trace(f"{name}: task cancelled")
|
||||
# Re-raise the exception to ensure the task is cancelled.
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(f"{name}: unexpected exception: {e}")
|
||||
|
||||
task = loop.create_task(run_coroutine())
|
||||
task.set_name(name)
|
||||
_TASKS.add(task)
|
||||
logger.trace(f"{name}: task created")
|
||||
return task
|
||||
|
||||
|
||||
async def wait_for_task(task: asyncio.Task, timeout: Optional[float] = None):
|
||||
"""Wait for an asyncio.Task to complete with optional timeout handling.
|
||||
|
||||
This function awaits the specified asyncio.Task and handles scenarios for
|
||||
timeouts, cancellations, and other exceptions. It also ensures that the task
|
||||
is removed from the set of registered tasks upon completion or failure.
|
||||
|
||||
Args:
|
||||
task (asyncio.Task): The asyncio Task to wait for.
|
||||
timeout (Optional[float], optional): The maximum number of seconds
|
||||
to wait for the task to complete. If None, waits indefinitely.
|
||||
Defaults to None.
|
||||
"""
|
||||
name = task.get_name()
|
||||
try:
|
||||
if timeout:
|
||||
await asyncio.wait_for(task, timeout=timeout)
|
||||
else:
|
||||
await task
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"{name}: timed out waiting for task to finish")
|
||||
except asyncio.CancelledError:
|
||||
logger.trace(f"{name}: unexpected task cancellation (maybe Ctrl-C?)")
|
||||
except Exception as e:
|
||||
logger.exception(f"{name}: unexpected exception while stopping task: {e}")
|
||||
finally:
|
||||
try:
|
||||
_TASKS.remove(task)
|
||||
except KeyError as e:
|
||||
logger.trace(f"{name}: unable to remove task (already removed?): {e}")
|
||||
|
||||
|
||||
async def cancel_task(task: asyncio.Task, timeout: Optional[float] = None):
|
||||
"""Cancels the given asyncio Task and awaits its completion with an
|
||||
optional timeout.
|
||||
|
||||
This function removes the task from the set of registered tasks upon
|
||||
completion or failure.
|
||||
|
||||
Args:
|
||||
task (asyncio.Task): The task to be cancelled.
|
||||
timeout (Optional[float]): The optional timeout in seconds to wait for the task to cancel.
|
||||
|
||||
"""
|
||||
name = task.get_name()
|
||||
task.cancel()
|
||||
try:
|
||||
if timeout:
|
||||
await asyncio.wait_for(task, timeout=timeout)
|
||||
else:
|
||||
await task
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"{name}: timed out waiting for task to cancel")
|
||||
except asyncio.CancelledError:
|
||||
# Here are sure the task is cancelled properly.
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.exception(f"{name}: unexpected exception while cancelling task: {e}")
|
||||
finally:
|
||||
try:
|
||||
_TASKS.remove(task)
|
||||
except KeyError as e:
|
||||
logger.trace(f"{name}: unable to remove task (already removed?): {e}")
|
||||
|
||||
|
||||
def current_tasks() -> Set[asyncio.Task]:
|
||||
"""Returns the list of currently created/registered tasks."""
|
||||
return _TASKS
|
||||
@@ -14,7 +14,7 @@ ENDOFSENTENCE_PATTERN_STR = r"""
|
||||
(?<!Mrs) # Negative lookbehind: not preceded by "Mrs"
|
||||
(?<!Prof) # Negative lookbehind: not preceded by "Prof"
|
||||
[\.\?\!:;]| # Match a period, question mark, exclamation point, colon, or semicolon
|
||||
[。?!:;] # the full-width version (mainly used in East Asian languages such as Chinese)
|
||||
[。?!:;।] # the full-width version (mainly used in East Asian languages such as Chinese, Hindi)
|
||||
$ # End of string
|
||||
"""
|
||||
ENDOFSENTENCE_PATTERN = re.compile(ENDOFSENTENCE_PATTERN_STR, re.VERBOSE)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import collections
|
||||
import itertools
|
||||
|
||||
|
||||
@@ -32,8 +32,7 @@ from pipecat.processors.frameworks.langchain import LangchainProcessor
|
||||
class TestLangchain(unittest.IsolatedAsyncioTestCase):
|
||||
class MockProcessor(FrameProcessor):
|
||||
def __init__(self, name):
|
||||
super().__init__()
|
||||
self.name = name
|
||||
super().__init__(name=name)
|
||||
self.token: list[str] = []
|
||||
# Start collecting tokens when we see the start frame
|
||||
self.start_collecting = False
|
||||
|
||||
@@ -38,3 +38,14 @@ class TestUtilsString(unittest.IsolatedAsyncioTestCase):
|
||||
for i in chinese_sentences:
|
||||
assert match_endofsentence(i)
|
||||
assert not match_endofsentence("你好,")
|
||||
|
||||
async def test_endofsentence_hi(self):
|
||||
hindi_sentences = [
|
||||
"हैलो।",
|
||||
"हैलो!",
|
||||
"आप खाये हैं?",
|
||||
"सुरक्षा पहले।",
|
||||
]
|
||||
for i in hindi_sentences:
|
||||
assert match_endofsentence(i)
|
||||
assert not match_endofsentence("हैलो,")
|
||||
|
||||
Reference in New Issue
Block a user