diff --git a/CHANGELOG.md b/CHANGELOG.md
index ef53cf663..8402013f8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,38 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
+- `BaseOutputTransport` now allows multiple destinations if the transport
+ implementation supports it (e.g. Daily's custom tracks). With multiple
+ destinations it is possible to send different audio or video tracks with a
+ single transport simultaneously. To do that, you need to set the new
+ `Frame.transport_destination` field with your desired transport destination
+ (e.g. custom track name), tell the transport you want a new destination with
+ `TransportParams.audio_out_destinations` or
+ `TransportParams.video_out_destinations` and the transport should take care of
+ the rest.
+
+- Similar to the new `Frame.transport_destination`, there's a new
+ `Frame.transport_source` field which is set by the `BaseInputTransport` if the
+ incoming data comes from a non-default source (e.g. custom tracks).
+
+- `TTSService` has a new `transport_destination` constructor parameter. This
+ parameter will be used to update the `Frame.transport_destination` field for
+ each generated `TTSAudioRawFrame`. This allows sending multiple bots' audio to
+ multiple destinations in the same pipeline.
+
+- Added `DailyTransportParams.camera_out_enabled` and
+ `DailyTransportParams.microphone_out_enabled` which allows you to
+ enable/disable the main output camera or microphone tracks. This is useful if
+ you only want to use custom tracks and not send the main tracks. Note that you
+ still need `audio_out_enabled=True` or `video_out_enabled`.
+
+- Added `DailyTransport.capture_participant_audio()` which allows you to capture
+ an audio source (e.g. "microphone", "screenAudio" or a custom track name) from
+ a remote participant.
+
+- Added `DailyTransport.update_publishing()` which allows you to update the call
+ video and audio publishing settings (e.g. audio and video quality).
+
- Added `RTVIObserverParams` which allows you to configure what RTVI messages
are sent to the clients.
@@ -37,6 +69,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
+- `TransportParams.audio_mixer` now supports a string and also a dictionary to
+ provide a mixer per destination. For example:
+
+```python
+ audio_out_mixer={
+ "track-1": SoundfileMixer(...),
+ "track-2": SoundfileMixer(...),
+ "track-N": SoundfileMixer(...),
+ },
+```
+
- The `STTMuteFilter` now mutes `InterimTranscriptionFrame` and
`TranscriptionFrame` which allows the `STTMuteFilter` to be used in
conjunction with transports that generate transcripts, e.g. `DailyTransport`.
@@ -105,6 +148,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Other
+- Added `examples/daily-custom-tracks` to show how to send and receive Daily
+ custom tracks.
+
+- Added `examples/daily-multi-translation` to showcase how to send multiple
+ simulataneous translations with the same transport.
+
- Added 04 foundational examples for client/server transports. Also, renamed
`29-livekit-audio-chat.py` to `04b-transports-livekit.py`.
diff --git a/examples/chatbot-audio-recording/runner.py b/examples/chatbot-audio-recording/runner.py
index 50743fd09..ad39a3ac4 100644
--- a/examples/chatbot-audio-recording/runner.py
+++ b/examples/chatbot-audio-recording/runner.py
@@ -53,4 +53,3 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
- return (url, token)
diff --git a/examples/daily-custom-tracks/README.md b/examples/daily-custom-tracks/README.md
new file mode 100644
index 000000000..dfd870373
--- /dev/null
+++ b/examples/daily-custom-tracks/README.md
@@ -0,0 +1,39 @@
+# Daily Custom Tracks
+
+This example shows how to send and receive Daily custom tracks. We will run a simple `daily-python` application to send an audio file with a custom track (named "pipecat") to a room. Then, the Pipecat bot will mirror that custom track into another custom track (named "pipecat-mirror") in the same room.
+
+## Get started
+
+```python
+python3 -m venv venv
+source venv/bin/activate
+pip install -r requirements.txt
+```
+
+## Run the bot
+
+Start the bot by giving it a Daily room URL.
+
+```bash
+python bot.py -u ROOM_URL
+```
+
+The bot will wait for the first participant to join. Then, it will mirror a custom track named "pipecat" into a new custom track named "pipecat-mirror".
+
+## Run the sender
+
+Now, run the custom track sender. This is a simple `daily-python` application that opens and audio file and sends it as a custom track to the same Daily room.
+
+```bash
+python custom_track_sender.py -u ROOM_URL -i office-ambience-mono-16000.mp3
+```
+
+## Open client
+
+Finally, open the client so you can hear both custom tracks.
+
+```bash
+open index.html
+```
+
+Once the client is opened, copy the URL of the Daily room and join it. You should be able to select which custom track you want to hear.
diff --git a/examples/daily-custom-tracks/bot.py b/examples/daily-custom-tracks/bot.py
new file mode 100644
index 000000000..17d0e2dfa
--- /dev/null
+++ b/examples/daily-custom-tracks/bot.py
@@ -0,0 +1,87 @@
+#
+# Copyright (c) 2024–2025, Daily
+#
+# SPDX-License-Identifier: BSD 2-Clause License
+#
+
+import asyncio
+import sys
+
+import aiohttp
+from loguru import logger
+from runner import configure
+
+from pipecat.frames.frames import Frame, InputAudioRawFrame, OutputAudioRawFrame
+from pipecat.pipeline.pipeline import Pipeline
+from pipecat.pipeline.runner import PipelineRunner
+from pipecat.pipeline.task import PipelineParams, PipelineTask
+from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
+from pipecat.transports.services.daily import DailyParams, DailyTransport
+
+logger.remove(0)
+logger.add(sys.stderr, level="DEBUG")
+
+
+class CustomTrackMirrorProcessor(FrameProcessor):
+ def __init__(self, transport_destination: str, **kwargs):
+ super().__init__(**kwargs)
+ self._transport_destination = transport_destination
+
+ async def process_frame(self, frame: Frame, direction: FrameDirection):
+ await super().process_frame(frame, direction)
+
+ if isinstance(frame, InputAudioRawFrame) and frame.transport_source:
+ output_frame = OutputAudioRawFrame(
+ audio=frame.audio,
+ sample_rate=frame.sample_rate,
+ num_channels=frame.num_channels,
+ )
+ output_frame.transport_destination = self._transport_destination
+ await self.push_frame(output_frame)
+ else:
+ await self.push_frame(frame, direction)
+
+
+async def main():
+ async with aiohttp.ClientSession() as session:
+ (room_url, _) = await configure(session)
+
+ transport = DailyTransport(
+ room_url,
+ None,
+ "Custom tracks mirror",
+ DailyParams(
+ audio_in_enabled=True,
+ audio_out_enabled=True,
+ microphone_out_enabled=False, # Disable since we just use custom tracks
+ audio_out_destinations=["pipecat-mirror"],
+ ),
+ )
+
+ pipeline = Pipeline(
+ [
+ transport.input(), # Transport user input
+ CustomTrackMirrorProcessor("pipecat-mirror"),
+ transport.output(), # Transport bot output
+ ]
+ )
+
+ task = PipelineTask(
+ pipeline,
+ params=PipelineParams(
+ audio_in_sample_rate=16000,
+ audio_out_sample_rate=16000,
+ ),
+ )
+
+ @transport.event_handler("on_first_participant_joined")
+ async def on_first_participant_joined(transport, participant):
+ await transport.capture_participant_audio(participant["id"], audio_source="pipecat")
+
+ runner = PipelineRunner()
+
+ await runner.run(task)
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/daily-custom-tracks/custom_track_sender.py b/examples/daily-custom-tracks/custom_track_sender.py
new file mode 100644
index 000000000..80c3cfbe6
--- /dev/null
+++ b/examples/daily-custom-tracks/custom_track_sender.py
@@ -0,0 +1,74 @@
+#
+# Copyright (c) 2024–2025, Daily
+#
+# SPDX-License-Identifier: BSD 2-Clause License
+#
+
+import argparse
+import time
+
+from daily import CallClient, CustomAudioSource, Daily
+from pydub import AudioSegment
+
+parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
+parser.add_argument("-u", "--url", type=str, required=True, help="URL of the Daily room to join")
+parser.add_argument(
+ "-i", "--input", type=str, required=True, help="Input audio file (needs 16000 sample rate)"
+)
+
+args, _ = parser.parse_known_args()
+
+audio = AudioSegment.from_mp3(args.input)
+
+raw_bytes = audio.raw_data
+sample_rate = audio.frame_rate
+channels = audio.channels
+
+print(f"Length: {len(raw_bytes)} bytes")
+print(f"Sample rate: {sample_rate}, Channels: {channels}")
+
+# Initialize the Daily context & create call client
+Daily.init()
+
+client = CallClient()
+
+# Join the room and indicate we have a custom track named "pipecat".
+client.join(
+ args.url,
+ client_settings={
+ "publishing": {
+ "camera": False,
+ "microphone": False,
+ "customAudio": {"pipecat": True},
+ },
+ },
+)
+
+# Just sleep for a couple of seconds. To do this well we should really use
+# completions.
+time.sleep(2)
+
+# Create the custom audio source. This is where we will write our audio.
+audio_source = CustomAudioSource(sample_rate, channels)
+
+# Create an audio track and assign it our audio source.
+client.add_custom_audio_track("pipecat", audio_source)
+
+# Just sleep for a second. To do this well we should really use completions.
+time.sleep(1)
+
+try:
+ # Just write one second of audio until we have read all the file.
+ chunk_size = sample_rate * channels * 2
+ while len(raw_bytes) > 0:
+ chunk = raw_bytes[:chunk_size]
+ raw_bytes = raw_bytes[chunk_size:]
+ audio_source.write_frames(chunk)
+
+except KeyboardInterrupt:
+ client.leave()
+
+# Just sleep for a second. To do this well we should really use completions.
+time.sleep(1)
+
+client.release()
diff --git a/examples/daily-custom-tracks/index.html b/examples/daily-custom-tracks/index.html
new file mode 100644
index 000000000..4b3f693f6
--- /dev/null
+++ b/examples/daily-custom-tracks/index.html
@@ -0,0 +1,173 @@
+
+
+ daily custom tracks
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/examples/daily-custom-tracks/office-ambience-mono-16000.mp3 b/examples/daily-custom-tracks/office-ambience-mono-16000.mp3
new file mode 100644
index 000000000..ea98082c7
Binary files /dev/null and b/examples/daily-custom-tracks/office-ambience-mono-16000.mp3 differ
diff --git a/examples/daily-custom-tracks/requirements.txt b/examples/daily-custom-tracks/requirements.txt
new file mode 100644
index 000000000..b3d2deec3
--- /dev/null
+++ b/examples/daily-custom-tracks/requirements.txt
@@ -0,0 +1,2 @@
+pydub
+pipecat-ai[daily]
diff --git a/examples/daily-custom-tracks/runner.py b/examples/daily-custom-tracks/runner.py
new file mode 100644
index 000000000..ad39a3ac4
--- /dev/null
+++ b/examples/daily-custom-tracks/runner.py
@@ -0,0 +1,55 @@
+#
+# Copyright (c) 2024–2025, Daily
+#
+# SPDX-License-Identifier: BSD 2-Clause License
+#
+
+import argparse
+import os
+
+import aiohttp
+
+from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
+
+
+async def configure(aiohttp_session: aiohttp.ClientSession):
+ parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
+ parser.add_argument(
+ "-u", "--url", type=str, required=False, help="URL of the Daily room to join"
+ )
+ parser.add_argument(
+ "-k",
+ "--apikey",
+ type=str,
+ required=False,
+ help="Daily API Key (needed to create an owner token for the room)",
+ )
+
+ args, unknown = parser.parse_known_args()
+
+ url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
+ key = args.apikey or os.getenv("DAILY_API_KEY")
+
+ if not url:
+ raise Exception(
+ "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
+ )
+
+ if not key:
+ raise Exception(
+ "No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
+ )
+
+ daily_rest_helper = DailyRESTHelper(
+ daily_api_key=key,
+ daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
+ aiohttp_session=aiohttp_session,
+ )
+
+ # Create a meeting token for the given room with an expiration 1 hour in
+ # the future.
+ expiry_time: float = 60 * 60
+
+ token = await daily_rest_helper.get_token(url, expiry_time)
+
+ return (url, token)
diff --git a/examples/daily-multi-translation/Dockerfile b/examples/daily-multi-translation/Dockerfile
new file mode 100644
index 000000000..419adca34
--- /dev/null
+++ b/examples/daily-multi-translation/Dockerfile
@@ -0,0 +1,15 @@
+FROM python:3.10-bullseye
+
+RUN mkdir /app
+RUN mkdir /app/assets
+RUN mkdir /app/utils
+COPY *.py /app/
+COPY requirements.txt /app/
+
+
+WORKDIR /app
+RUN pip3 install -r requirements.txt
+
+EXPOSE 7860
+
+CMD ["python3", "server.py"]
diff --git a/examples/daily-multi-translation/README.md b/examples/daily-multi-translation/README.md
new file mode 100644
index 000000000..7e27cb217
--- /dev/null
+++ b/examples/daily-multi-translation/README.md
@@ -0,0 +1,39 @@
+# Daily Multi Translation
+
+This example shows how to use Daily to stream multiple simultaneous translations using a single transport. Daily provides custom tracks and in this example we will simultaneously translate incoming audio in English to Spanish, French and German, each of them being sent to a custom track.
+
+## Get started
+
+```python
+python3 -m venv venv
+source venv/bin/activate
+pip install -r requirements.txt
+
+cp env.example .env # and add your credentials
+
+```
+
+## Run the server
+
+```bash
+python server.py
+```
+
+Then, visit `http://localhost:7860/` in your browser. This will open a Daily Prebuilt room where you will speak in English (make sure you are not muted).
+
+## Open client
+
+Next, you need to open the client that will listen to the translations.
+
+```bash
+open index.html
+```
+
+Once the client is opened, copy the URL of the Daily room created above and join it. You should be able to select which translation you want to hear.
+
+## Build and test the Docker image
+
+```
+docker build -t daily-multi-translation .
+docker run --env-file .env -p 7860:7860 daily-multi-translation
+```
diff --git a/examples/daily-multi-translation/bot.py b/examples/daily-multi-translation/bot.py
new file mode 100644
index 000000000..5f5037584
--- /dev/null
+++ b/examples/daily-multi-translation/bot.py
@@ -0,0 +1,165 @@
+#
+# Copyright (c) 2024–2025, Daily
+#
+# SPDX-License-Identifier: BSD 2-Clause License
+#
+
+import asyncio
+import os
+import sys
+
+import aiohttp
+from dotenv import load_dotenv
+from loguru import logger
+from runner import configure
+
+from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
+from pipecat.audio.vad.silero import SileroVADAnalyzer
+from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
+from pipecat.pipeline.parallel_pipeline import ParallelPipeline
+from pipecat.pipeline.pipeline import Pipeline
+from pipecat.pipeline.runner import PipelineRunner
+from pipecat.pipeline.task import PipelineParams, PipelineTask
+from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
+from pipecat.services.cartesia.tts import CartesiaTTSService
+from pipecat.services.deepgram.stt import DeepgramSTTService
+from pipecat.services.openai.llm import OpenAILLMService
+from pipecat.transports.services.daily import DailyParams, DailyTransport
+
+load_dotenv(override=True)
+
+logger.remove(0)
+logger.add(sys.stderr, level="DEBUG")
+
+BACKGROUND_SOUND_FILE = "office-ambience-mono-16000.mp3"
+
+
+async def main():
+ async with aiohttp.ClientSession() as session:
+ (room_url, token) = await configure(session)
+
+ transport = DailyTransport(
+ room_url,
+ token,
+ "Multi translation bot",
+ DailyParams(
+ audio_in_enabled=True,
+ audio_out_enabled=True,
+ audio_out_mixer={
+ "spanish": SoundfileMixer(
+ sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
+ ),
+ "french": SoundfileMixer(
+ sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
+ ),
+ "german": SoundfileMixer(
+ sound_files={"office": BACKGROUND_SOUND_FILE}, default_sound="office"
+ ),
+ },
+ audio_out_destinations=["spanish", "french", "german"],
+ microphone_out_enabled=False, # Disable since we just use custom tracks
+ vad_analyzer=SileroVADAnalyzer(),
+ ),
+ )
+
+ stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
+
+ tts_spanish = CartesiaTTSService(
+ api_key=os.getenv("CARTESIA_API_KEY"),
+ voice_id="cefcb124-080b-4655-b31f-932f3ee743de",
+ transport_destination="spanish",
+ )
+ tts_french = CartesiaTTSService(
+ api_key=os.getenv("CARTESIA_API_KEY"),
+ voice_id="8832a0b5-47b2-4751-bb22-6a8e2149303d",
+ transport_destination="french",
+ )
+ tts_german = CartesiaTTSService(
+ api_key=os.getenv("CARTESIA_API_KEY"),
+ voice_id="38aabb6a-f52b-4fb0-a3d1-988518f4dc06",
+ transport_destination="german",
+ )
+
+ messages_spanish = [
+ {
+ "role": "system",
+ "content": "You will be provided with a sentence in English, and your task is to only translate it into Spanish.",
+ },
+ ]
+ messages_french = [
+ {
+ "role": "system",
+ "content": "You will be provided with a sentence in English, and your task is to only translate it into French.",
+ },
+ ]
+ messages_german = [
+ {
+ "role": "system",
+ "content": "You will be provided with a sentence in English, and your task is to only translate it into German.",
+ },
+ ]
+
+ llm_spanish = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
+ llm_french = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
+ llm_german = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
+
+ context_spanish = OpenAILLMContext(messages_spanish)
+ context_aggregator_spanish = llm_spanish.create_context_aggregator(context_spanish)
+
+ context_french = OpenAILLMContext(messages_french)
+ context_aggregator_french = llm_french.create_context_aggregator(context_french)
+
+ context_german = OpenAILLMContext(messages_german)
+ context_aggregator_german = llm_german.create_context_aggregator(context_german)
+
+ pipeline = Pipeline(
+ [
+ transport.input(), # Transport user input
+ stt,
+ ParallelPipeline(
+ # Spanish pipeline.
+ [
+ context_aggregator_spanish.user(),
+ llm_spanish,
+ tts_spanish,
+ context_aggregator_spanish.assistant(),
+ ],
+ # French pipeline.
+ [
+ context_aggregator_french.user(),
+ llm_french,
+ tts_french,
+ context_aggregator_french.assistant(),
+ ],
+ # German pipeline.
+ [
+ context_aggregator_german.user(),
+ llm_german,
+ tts_german,
+ context_aggregator_german.assistant(),
+ ],
+ ),
+ transport.output(), # Transport bot output
+ ]
+ )
+
+ task = PipelineTask(
+ pipeline,
+ params=PipelineParams(
+ audio_in_sample_rate=16000,
+ audio_out_sample_rate=16000,
+ allow_interruptions=True,
+ enable_metrics=True,
+ enable_usage_metrics=True,
+ report_only_initial_ttfb=True,
+ ),
+ observers=[TranscriptionLogObserver()],
+ )
+
+ runner = PipelineRunner()
+
+ await runner.run(task)
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/daily-multi-translation/env.example b/examples/daily-multi-translation/env.example
new file mode 100644
index 000000000..a780ec7d8
--- /dev/null
+++ b/examples/daily-multi-translation/env.example
@@ -0,0 +1,5 @@
+DAILY_SAMPLE_ROOM_URL=https://yourdomain.daily.co/yourroom # (for joining the bot to the same room repeatedly for local dev)
+DAILY_API_KEY=7df...
+OPENAI_API_KEY=sk-PL...
+DEEPGRAM_API_KEY=efb...
+CARTESIA_API_KEY=aeb...
diff --git a/examples/daily-multi-translation/index.html b/examples/daily-multi-translation/index.html
new file mode 100644
index 000000000..c8ca0832d
--- /dev/null
+++ b/examples/daily-multi-translation/index.html
@@ -0,0 +1,202 @@
+
+
+ daily multi translation
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/examples/daily-multi-translation/office-ambience-mono-16000.mp3 b/examples/daily-multi-translation/office-ambience-mono-16000.mp3
new file mode 100644
index 000000000..ea98082c7
Binary files /dev/null and b/examples/daily-multi-translation/office-ambience-mono-16000.mp3 differ
diff --git a/examples/daily-multi-translation/requirements.txt b/examples/daily-multi-translation/requirements.txt
new file mode 100644
index 000000000..e20c41d5a
--- /dev/null
+++ b/examples/daily-multi-translation/requirements.txt
@@ -0,0 +1,5 @@
+aiofiles
+python-dotenv
+fastapi[all]
+uvicorn
+pipecat-ai[daily,deepgram,openai,silero,cartesia]
diff --git a/examples/daily-multi-translation/runner.py b/examples/daily-multi-translation/runner.py
new file mode 100644
index 000000000..ad39a3ac4
--- /dev/null
+++ b/examples/daily-multi-translation/runner.py
@@ -0,0 +1,55 @@
+#
+# Copyright (c) 2024–2025, Daily
+#
+# SPDX-License-Identifier: BSD 2-Clause License
+#
+
+import argparse
+import os
+
+import aiohttp
+
+from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper
+
+
+async def configure(aiohttp_session: aiohttp.ClientSession):
+ parser = argparse.ArgumentParser(description="Daily AI SDK Bot Sample")
+ parser.add_argument(
+ "-u", "--url", type=str, required=False, help="URL of the Daily room to join"
+ )
+ parser.add_argument(
+ "-k",
+ "--apikey",
+ type=str,
+ required=False,
+ help="Daily API Key (needed to create an owner token for the room)",
+ )
+
+ args, unknown = parser.parse_known_args()
+
+ url = args.url or os.getenv("DAILY_SAMPLE_ROOM_URL")
+ key = args.apikey or os.getenv("DAILY_API_KEY")
+
+ if not url:
+ raise Exception(
+ "No Daily room specified. use the -u/--url option from the command line, or set DAILY_SAMPLE_ROOM_URL in your environment to specify a Daily room URL."
+ )
+
+ if not key:
+ raise Exception(
+ "No Daily API key specified. use the -k/--apikey option from the command line, or set DAILY_API_KEY in your environment to specify a Daily API key, available from https://dashboard.daily.co/developers."
+ )
+
+ daily_rest_helper = DailyRESTHelper(
+ daily_api_key=key,
+ daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
+ aiohttp_session=aiohttp_session,
+ )
+
+ # Create a meeting token for the given room with an expiration 1 hour in
+ # the future.
+ expiry_time: float = 60 * 60
+
+ token = await daily_rest_helper.get_token(url, expiry_time)
+
+ return (url, token)
diff --git a/examples/daily-multi-translation/server.py b/examples/daily-multi-translation/server.py
new file mode 100644
index 000000000..a0f38854c
--- /dev/null
+++ b/examples/daily-multi-translation/server.py
@@ -0,0 +1,139 @@
+#
+# Copyright (c) 2024–2025, Daily
+#
+# SPDX-License-Identifier: BSD 2-Clause License
+#
+
+import argparse
+import os
+import subprocess
+from contextlib import asynccontextmanager
+
+import aiohttp
+from dotenv import load_dotenv
+from fastapi import FastAPI, HTTPException, Request
+from fastapi.middleware.cors import CORSMiddleware
+from fastapi.responses import JSONResponse, RedirectResponse
+
+from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
+
+MAX_BOTS_PER_ROOM = 1
+
+# Bot sub-process dict for status reporting and concurrency control
+bot_procs = {}
+
+daily_helpers = {}
+
+load_dotenv(override=True)
+
+
+def cleanup():
+ # Clean up function, just to be extra safe
+ for entry in bot_procs.values():
+ proc = entry[0]
+ proc.terminate()
+ proc.wait()
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ aiohttp_session = aiohttp.ClientSession()
+ daily_helpers["rest"] = DailyRESTHelper(
+ daily_api_key=os.getenv("DAILY_API_KEY", ""),
+ daily_api_url=os.getenv("DAILY_API_URL", "https://api.daily.co/v1"),
+ aiohttp_session=aiohttp_session,
+ )
+ yield
+ await aiohttp_session.close()
+ cleanup()
+
+
+app = FastAPI(lifespan=lifespan)
+
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+
+@app.get("/")
+async def start_agent(request: Request):
+ print(f"!!! Creating room")
+ room = await daily_helpers["rest"].create_room(DailyRoomParams())
+ print(f"!!! Room URL: {room.url}")
+ # Ensure the room property is present
+ if not room.url:
+ raise HTTPException(
+ status_code=500,
+ detail="Missing 'room' property in request data. Cannot start agent without a target room!",
+ )
+
+ # Check if there is already an existing process running in this room
+ num_bots_in_room = sum(
+ 1 for proc in bot_procs.values() if proc[1] == room.url and proc[0].poll() is None
+ )
+ if num_bots_in_room >= MAX_BOTS_PER_ROOM:
+ raise HTTPException(status_code=500, detail=f"Max bot limited reach for room: {room.url}")
+
+ # Get the token for the room
+ token = await daily_helpers["rest"].get_token(room.url)
+
+ if not token:
+ raise HTTPException(status_code=500, detail=f"Failed to get token for room: {room.url}")
+
+ # Spawn a new agent, and join the user session
+ # Note: this is mostly for demonstration purposes (refer to 'deployment' in README)
+ try:
+ proc = subprocess.Popen(
+ [f"python3 -m bot -u {room.url} -t {token}"],
+ shell=True,
+ bufsize=1,
+ cwd=os.path.dirname(os.path.abspath(__file__)),
+ )
+ bot_procs[proc.pid] = (proc, room.url)
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"Failed to start subprocess: {e}")
+
+ return RedirectResponse(room.url)
+
+
+@app.get("/status/{pid}")
+def get_status(pid: int):
+ # Look up the subprocess
+ proc = bot_procs.get(pid)
+
+ # If the subprocess doesn't exist, return an error
+ if not proc:
+ raise HTTPException(status_code=404, detail=f"Bot with process id: {pid} not found")
+
+ # Check the status of the subprocess
+ if proc[0].poll() is None:
+ status = "running"
+ else:
+ status = "finished"
+
+ return JSONResponse({"bot_id": pid, "status": status})
+
+
+if __name__ == "__main__":
+ import uvicorn
+
+ default_host = os.getenv("HOST", "0.0.0.0")
+ default_port = int(os.getenv("FAST_API_PORT", "7860"))
+
+ parser = argparse.ArgumentParser(description="Daily Storyteller FastAPI server")
+ parser.add_argument("--host", type=str, default=default_host, help="Host address")
+ parser.add_argument("--port", type=int, default=default_port, help="Port number")
+ parser.add_argument("--reload", action="store_true", help="Reload code on change")
+
+ config = parser.parse_args()
+
+ uvicorn.run(
+ "server:app",
+ host=config.host,
+ port=config.port,
+ reload=config.reload,
+ )
diff --git a/examples/sentry-metrics/runner.py b/examples/sentry-metrics/runner.py
index 50743fd09..ad39a3ac4 100644
--- a/examples/sentry-metrics/runner.py
+++ b/examples/sentry-metrics/runner.py
@@ -53,4 +53,3 @@ async def configure(aiohttp_session: aiohttp.ClientSession):
token = await daily_rest_helper.get_token(url, expiry_time)
return (url, token)
- return (url, token)
diff --git a/pyproject.toml b/pyproject.toml
index ca7f9db61..f7db97ea0 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -47,7 +47,7 @@ canonical = [ "aiofiles~=24.1.0" ]
cartesia = [ "cartesia~=1.4.0", "websockets~=13.1" ]
cerebras = []
deepseek = []
-daily = [ "daily-python~=0.17.0" ]
+daily = [ "daily-python~=0.18.0" ]
deepgram = [ "deepgram-sdk~=3.8.0" ]
elevenlabs = [ "websockets~=13.1" ]
fal = [ "fal-client~=0.5.9" ]
diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py
index 0d7d90d78..05f5b666d 100644
--- a/src/pipecat/frames/frames.py
+++ b/src/pipecat/frames/frames.py
@@ -60,12 +60,16 @@ class Frame:
name: str = field(init=False)
pts: Optional[int] = field(init=False)
metadata: Dict[str, Any] = field(init=False)
+ transport_source: Optional[str] = field(init=False)
+ transport_destination: Optional[str] = field(init=False)
def __post_init__(self):
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
self.metadata: Dict[str, Any] = {}
+ self.transport_source: Optional[str] = None
+ self.transport_destination: Optional[str] = None
def __str__(self):
return self.name
@@ -136,8 +140,9 @@ class ImageRawFrame:
@dataclass
class OutputAudioRawFrame(DataFrame, AudioRawFrame):
- """A chunk of audio. Will be played by the output transport if the
- transport's microphone has been enabled.
+ """A chunk of audio. Will be played by the output transport. If the
+ transport supports multiple audio destinations (e.g. multiple audio tracks) the
+ destination name can be specified.
"""
@@ -147,13 +152,14 @@ class OutputAudioRawFrame(DataFrame, AudioRawFrame):
def __str__(self):
pts = format_pts(self.pts)
- return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
+ return f"{self.name}(pts: {pts}, destination: {self.transport_destination}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass
class OutputImageRawFrame(DataFrame, ImageRawFrame):
- """An image that will be shown by the transport if the transport's camera is
- enabled.
+ """An image that will be shown by the transport. If the transport supports
+ multiple video destinations (e.g. multiple video tracks) the destination
+ name can be specified.
"""
@@ -176,7 +182,7 @@ class URLImageRawFrame(OutputImageRawFrame):
"""
- url: Optional[str]
+ url: Optional[str] = None
def __str__(self):
pts = format_pts(self.pts)
@@ -716,7 +722,11 @@ class UserImageRequestFrame(SystemFrame):
@dataclass
class InputAudioRawFrame(SystemFrame, AudioRawFrame):
- """A chunk of audio usually coming from an input transport."""
+ """A chunk of audio usually coming from an input transport. If the transport
+ supports multiple audio sources (e.g. multiple audio tracks) the source name
+ will be specified.
+
+ """
def __post_init__(self):
super().__post_init__()
@@ -724,35 +734,50 @@ class InputAudioRawFrame(SystemFrame, AudioRawFrame):
def __str__(self):
pts = format_pts(self.pts)
- return f"{self.name}(pts: {pts}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
+ return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass
class InputImageRawFrame(SystemFrame, ImageRawFrame):
- """An image usually coming from an input transport."""
+ """An image usually coming from an input transport. If the transport
+ supports multiple video sources (e.g. multiple video tracks) the source name
+ will be specified.
+
+ """
def __str__(self):
pts = format_pts(self.pts)
- return f"{self.name}(pts: {pts}, size: {self.size}, format: {self.format})"
+ return f"{self.name}(pts: {pts}, source: {self.transport_source}, size: {self.size}, format: {self.format})"
+
+
+@dataclass
+class UserAudioRawFrame(InputAudioRawFrame):
+ """A chunk of audio, usually coming from an input transport, associated to a user."""
+
+ user_id: str = ""
+
+ def __str__(self):
+ pts = format_pts(self.pts)
+ return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {len(self.audio)}, frames: {self.num_frames}, sample_rate: {self.sample_rate}, channels: {self.num_channels})"
@dataclass
class UserImageRawFrame(InputImageRawFrame):
"""An image associated to a user."""
- user_id: str
+ user_id: str = ""
request: Optional[UserImageRequestFrame] = None
def __str__(self):
pts = format_pts(self.pts)
- return f"{self.name}(pts: {pts}, user: {self.user_id}, size: {self.size}, format: {self.format}, request: {self.request})"
+ return f"{self.name}(pts: {pts}, user: {self.user_id}, source: {self.transport_source}, size: {self.size}, format: {self.format}, request: {self.request})"
@dataclass
class VisionImageRawFrame(InputImageRawFrame):
"""An image with an associated text to ask for a description of it."""
- text: Optional[str]
+ text: Optional[str] = None
def __str__(self):
pts = format_pts(self.pts)
diff --git a/src/pipecat/services/tts_service.py b/src/pipecat/services/tts_service.py
index a2b8e3f90..ed37caa01 100644
--- a/src/pipecat/services/tts_service.py
+++ b/src/pipecat/services/tts_service.py
@@ -66,6 +66,8 @@ class TTSService(AIService):
# Text filter executed after text has been aggregated.
text_filters: Sequence[BaseTextFilter] = [],
text_filter: Optional[BaseTextFilter] = None,
+ # Audio transport destination of the generated frames.
+ transport_destination: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
@@ -82,6 +84,8 @@ class TTSService(AIService):
self._settings: Dict[str, Any] = {}
self._text_aggregator: BaseTextAggregator = text_aggregator or SimpleTextAggregator()
self._text_filters: Sequence[BaseTextFilter] = text_filters
+ self._transport_destination: Optional[str] = transport_destination
+
if text_filter:
import warnings
@@ -207,13 +211,16 @@ class TTSService(AIService):
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
if self._push_silence_after_stop and isinstance(frame, TTSStoppedFrame):
silence_num_bytes = int(self._silence_time_s * self.sample_rate * 2) # 16-bit
- await self.push_frame(
- TTSAudioRawFrame(
- audio=b"\x00" * silence_num_bytes,
- sample_rate=self.sample_rate,
- num_channels=1,
- )
+ silence_frame = TTSAudioRawFrame(
+ audio=b"\x00" * silence_num_bytes,
+ sample_rate=self.sample_rate,
+ num_channels=1,
)
+ silence_frame.transport_destination = self._transport_destination
+ await self.push_frame(silence_frame)
+
+ if isinstance(frame, (TTSStartedFrame, TTSStoppedFrame, TTSAudioRawFrame, TTSTextFrame)):
+ frame.transport_destination = self._transport_destination
await super().push_frame(frame, direction)
diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py
index acb0058c6..51ebdb677 100644
--- a/src/pipecat/transports/base_input.py
+++ b/src/pipecat/transports/base_input.py
@@ -79,7 +79,7 @@ class BaseInputTransport(FrameProcessor):
)
self._params.audio_in_passthrough = True
- if self._params.camera_in_enabled or self._params.camera_out_enabled:
+ if self._params.camera_in_enabled:
import warnings
with warnings.catch_warnings():
diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py
index de53fac60..31a86f92d 100644
--- a/src/pipecat/transports/base_output.py
+++ b/src/pipecat/transports/base_output.py
@@ -8,11 +8,12 @@ import asyncio
import itertools
import sys
import time
-from typing import AsyncGenerator, List
+from typing import Any, AsyncGenerator, Dict, List, Mapping, Optional
from loguru import logger
from PIL import Image
+from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
from pipecat.audio.utils import create_default_resampler
from pipecat.frames.frames import (
BotSpeakingFrame,
@@ -46,35 +47,28 @@ class BaseOutputTransport(FrameProcessor):
self._params = params
- # Task to process incoming frames so we don't block upstream elements.
- self._sink_task = None
-
- # Task to process incoming frames using a clock.
- self._sink_clock_task = None
-
- # Task to write/send audio and image frames.
- self._video_out_task = None
-
- # These are the images that we should send at our desired framerate.
- self._video_images = None
-
# Output sample rate. It will be initialized on StartFrame.
self._sample_rate = 0
- self._resampler = create_default_resampler()
- # Chunk size that will be written. It will be computed on StartFrame
+ # We write 10ms*CHUNKS of audio at a time (where CHUNKS is the
+ # `audio_out_10ms_chunks` parameter). If we receive long audio frames we
+ # will chunk them. This helps with interruption handling. It will be
+ # initialized on StartFrame.
self._audio_chunk_size = 0
- self._audio_buffer = bytearray()
- self._stopped_event = asyncio.Event()
-
- # Indicates if the bot is currently speaking.
- self._bot_speaking = False
+ # We will have one media sender per output frame destination. This allow
+ # us to send multiple streams at the same time if the transport allows
+ # it.
+ self._media_senders: Dict[Any, "BaseOutputTransport.MediaSender"] = {}
@property
def sample_rate(self) -> int:
return self._sample_rate
+ @property
+ def audio_chunk_size(self) -> int:
+ return self._audio_chunk_size
+
async def start(self, frame: StartFrame):
self._sample_rate = self._params.audio_out_sample_rate or frame.audio_out_sample_rate
@@ -84,42 +78,63 @@ class BaseOutputTransport(FrameProcessor):
audio_bytes_10ms = int(self._sample_rate / 100) * self._params.audio_out_channels * 2
self._audio_chunk_size = audio_bytes_10ms * self._params.audio_out_10ms_chunks
- # Start audio mixer.
- if self._params.audio_out_mixer:
- await self._params.audio_out_mixer.start(self._sample_rate)
- self._create_video_task()
- self._create_sink_tasks()
+ # Register destinations.
+ for destination in self._params.audio_out_destinations:
+ await self.register_audio_destination(destination)
+
+ for destination in self._params.video_out_destinations:
+ await self.register_video_destination(destination)
+
+ # Start default media sender.
+ self._media_senders[None] = BaseOutputTransport.MediaSender(
+ self,
+ destination=None,
+ sample_rate=self.sample_rate,
+ audio_chunk_size=self.audio_chunk_size,
+ params=self._params,
+ )
+ await self._media_senders[None].start(frame)
+
+ # Media senders already send both audio and video, so make sure we only
+ # have one media server per shared name.
+ destinations = list(
+ set(self._params.audio_out_destinations + self._params.video_out_destinations)
+ )
+
+ # Start media senders.
+ for destination in destinations:
+ self._media_senders[destination] = BaseOutputTransport.MediaSender(
+ self,
+ destination=destination,
+ sample_rate=self.sample_rate,
+ audio_chunk_size=self.audio_chunk_size,
+ params=self._params,
+ )
+ await self._media_senders[destination].start(frame)
async def stop(self, frame: EndFrame):
- # Let the sink tasks process the queue until they reach this EndFrame.
- await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
- await self._sink_queue.put(frame)
-
- # At this point we have enqueued an EndFrame and we need to wait for
- # that EndFrame to be processed by the sink tasks. We also need to wait
- # for these tasks before cancelling the video and audio tasks below
- # because they might be still rendering.
- if self._sink_task:
- await self.wait_for_task(self._sink_task)
- if self._sink_clock_task:
- await self.wait_for_task(self._sink_clock_task)
-
- # We can now cancel the video task.
- await self._cancel_video_task()
+ for _, sender in self._media_senders.items():
+ await sender.stop(frame)
async def cancel(self, frame: CancelFrame):
- # Since we are cancelling everything it doesn't matter if we cancel sink
- # tasks first or not.
- await self._cancel_sink_tasks()
- await self._cancel_video_task()
+ for _, sender in self._media_senders.items():
+ await sender.cancel(frame)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
pass
- async def write_raw_video_frame(self, frame: OutputImageRawFrame):
+ async def register_video_destination(self, destination: str):
pass
- async def write_raw_audio_frames(self, frames: bytes):
+ async def register_audio_destination(self, destination: str):
+ pass
+
+ async def write_raw_video_frame(
+ self, frame: OutputImageRawFrame, destination: Optional[str] = None
+ ):
+ pass
+
+ async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
pass
async def send_audio(self, frame: OutputAudioRawFrame):
@@ -150,7 +165,7 @@ class BaseOutputTransport(FrameProcessor):
await self.push_frame(frame, direction)
elif isinstance(frame, (StartInterruptionFrame, StopInterruptionFrame)):
await self.push_frame(frame, direction)
- await self._handle_interruptions(frame)
+ await self._handle_frame(frame)
elif isinstance(frame, TransportMessageUrgentFrame):
await self.send_message(frame)
elif isinstance(frame, SystemFrame):
@@ -160,117 +175,416 @@ class BaseOutputTransport(FrameProcessor):
await self.stop(frame)
# Keep pushing EndFrame down so all the pipeline stops nicely.
await self.push_frame(frame, direction)
- elif isinstance(frame, MixerControlFrame) and self._params.audio_out_mixer:
- await self._params.audio_out_mixer.process_frame(frame)
+ elif isinstance(frame, MixerControlFrame):
+ await self._handle_frame(frame)
# Other frames.
elif isinstance(frame, OutputAudioRawFrame):
- await self._handle_audio(frame)
+ await self._handle_frame(frame)
elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
- await self._handle_image(frame)
+ await self._handle_frame(frame)
# TODO(aleix): Images and audio should support presentation timestamps.
elif frame.pts:
- await self._sink_clock_queue.put((frame.pts, frame.id, frame))
+ await self._handle_frame(frame)
elif direction == FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
else:
- await self._sink_queue.put(frame)
+ await self._handle_frame(frame)
- async def _handle_interruptions(self, frame: Frame):
- if not self.interruptions_allowed:
+ async def _handle_frame(self, frame: Frame):
+ if frame.transport_destination not in self._media_senders:
+ logger.warning(
+ f"{self} destination [{frame.transport_destination}] not registered for frame {frame}"
+ )
return
+ sender = self._media_senders[frame.transport_destination]
+
if isinstance(frame, StartInterruptionFrame):
- # Cancel sink and video tasks.
- await self._cancel_sink_tasks()
- await self._cancel_video_task()
- # Create sink and video tasks.
+ await sender.handle_interruptions(frame)
+ elif isinstance(frame, OutputAudioRawFrame):
+ await sender.handle_audio_frame(frame)
+ elif isinstance(frame, (OutputImageRawFrame, SpriteFrame)):
+ await sender.handle_image_frame(frame)
+ elif isinstance(frame, MixerControlFrame):
+ await sender.handle_mixer_control_frame(frame)
+ elif frame.pts:
+ await sender.handle_timed_frame(frame)
+ else:
+ await sender.handle_sync_frame(frame)
+
+ #
+ # Media Sender
+ #
+
+ class MediaSender:
+ def __init__(
+ self,
+ transport: "BaseOutputTransport",
+ *,
+ destination: Optional[str],
+ sample_rate: int,
+ audio_chunk_size: int,
+ params: TransportParams,
+ ):
+ self._transport = transport
+ self._destination = destination
+ self._sample_rate = sample_rate
+ self._audio_chunk_size = audio_chunk_size
+ self._params = params
+
+ # Buffer to keep track of incoming audio.
+ self._audio_buffer = bytearray()
+
+ # This will be used to resample incoming audio to the output sample rate.
+ self._resampler = create_default_resampler()
+
+ # The user can provide a single mixer, to be used by the default
+ # destination, or a destination/mixer mapping.
+ self._mixer: Optional[BaseAudioMixer] = None
+
+ # These are the images that we should send at our desired framerate.
+ self._video_images = None
+
+ # Indicates if the bot is currently speaking.
+ self._bot_speaking = False
+
+ self._audio_task: Optional[asyncio.Task] = None
+ self._video_task: Optional[asyncio.Task] = None
+ self._clock_task: Optional[asyncio.Task] = None
+
+ @property
+ def sample_rate(self) -> int:
+ return self._sample_rate
+
+ @property
+ def audio_chunk_size(self) -> int:
+ return self._audio_chunk_size
+
+ async def start(self, frame: StartFrame):
+ self._audio_buffer = bytearray()
+
+ # Create all tasks.
self._create_video_task()
- self._create_sink_tasks()
+ self._create_clock_task()
+ self._create_audio_task()
+
+ # Check if we have an audio mixer for our destination.
+ if self._params.audio_out_mixer:
+ if isinstance(self._params.audio_out_mixer, Mapping):
+ self._mixer = self._params.audio_out_mixer.get(self._destination, None)
+ elif not self._destination:
+ # Only use the default mixer if we are the default destination.
+ self._mixer = self._params.audio_out_mixer
+
+ # Start audio mixer.
+ if self._mixer:
+ await self._mixer.start(self._sample_rate)
+
+ async def stop(self, frame: EndFrame):
+ # Let the sink tasks process the queue until they reach this EndFrame.
+ await self._clock_queue.put((sys.maxsize, frame.id, frame))
+ await self._audio_queue.put(frame)
+
+ # At this point we have enqueued an EndFrame and we need to wait for
+ # that EndFrame to be processed by the audio and clock tasks. We
+ # also need to wait for these tasks before cancelling the video task
+ # because it might be still rendering.
+ if self._audio_task:
+ await self._transport.wait_for_task(self._audio_task)
+ if self._clock_task:
+ await self._transport.wait_for_task(self._clock_task)
+
+ # Stop audio mixer.
+ if self._mixer:
+ await self._mixer.stop()
+
+ # We can now cancel the video task.
+ await self._cancel_video_task()
+
+ async def cancel(self, frame: CancelFrame):
+ # Since we are cancelling everything it doesn't matter what task we cancel first.
+ await self._cancel_audio_task()
+ await self._cancel_clock_task()
+ await self._cancel_video_task()
+
+ async def handle_interruptions(self, _: StartInterruptionFrame):
+ if not self._transport.interruptions_allowed:
+ return
+
+ # Cancel tasks.
+ await self._cancel_audio_task()
+ await self._cancel_clock_task()
+ await self._cancel_video_task()
+ # Create tasks.
+ self._create_video_task()
+ self._create_clock_task()
+ self._create_audio_task()
# Let's send a bot stopped speaking if we have to.
await self._bot_stopped_speaking()
- async def _handle_audio(self, frame: OutputAudioRawFrame):
- if not self._params.audio_out_enabled:
- return
+ async def handle_audio_frame(self, frame: OutputAudioRawFrame):
+ if not self._params.audio_out_enabled:
+ return
- # We might need to resample if incoming audio doesn't match the
- # transport sample rate.
- resampled = await self._resampler.resample(
- frame.audio, frame.sample_rate, self._sample_rate
- )
-
- cls = type(frame)
- self._audio_buffer.extend(resampled)
- while len(self._audio_buffer) >= self._audio_chunk_size:
- chunk = cls(
- bytes(self._audio_buffer[: self._audio_chunk_size]),
- sample_rate=self._sample_rate,
- num_channels=frame.num_channels,
+ # We might need to resample if incoming audio doesn't match the
+ # transport sample rate.
+ resampled = await self._resampler.resample(
+ frame.audio, frame.sample_rate, self._sample_rate
)
- await self._sink_queue.put(chunk)
- self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
- async def _handle_image(self, frame: OutputImageRawFrame | SpriteFrame):
- if not self._params.video_out_enabled:
- return
+ cls = type(frame)
+ self._audio_buffer.extend(resampled)
+ while len(self._audio_buffer) >= self._audio_chunk_size:
+ chunk = cls(
+ bytes(self._audio_buffer[: self._audio_chunk_size]),
+ sample_rate=self._sample_rate,
+ num_channels=frame.num_channels,
+ )
+ await self._audio_queue.put(chunk)
+ self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
- if self._params.video_out_is_live:
- await self._video_out_queue.put(frame)
- else:
- await self._sink_queue.put(frame)
+ async def handle_image_frame(self, frame: OutputImageRawFrame | SpriteFrame):
+ if not self._params.video_out_enabled:
+ return
- async def _bot_started_speaking(self):
- if not self._bot_speaking:
- logger.debug("Bot started speaking")
- await self.push_frame(BotStartedSpeakingFrame())
- await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM)
- self._bot_speaking = True
+ if self._params.video_out_is_live and isinstance(frame, OutputImageRawFrame):
+ await self._video_queue.put(frame)
+ elif isinstance(frame, OutputImageRawFrame):
+ await self._set_video_image(frame)
+ else:
+ await self._set_video_images(frame.images)
- async def _bot_stopped_speaking(self):
- if self._bot_speaking:
- logger.debug("Bot stopped speaking")
- await self.push_frame(BotStoppedSpeakingFrame())
- await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
- self._bot_speaking = False
- # Clean audio buffer (there could be tiny left overs if not multiple
- # to our output chunk size).
- self._audio_buffer = bytearray()
+ async def handle_timed_frame(self, frame: Frame):
+ await self._clock_queue.put((frame.pts, frame.id, frame))
- #
- # Sink tasks
- #
+ async def handle_sync_frame(self, frame: Frame):
+ await self._audio_queue.put(frame)
- def _create_sink_tasks(self):
- if not self._sink_task:
- self._sink_queue = asyncio.Queue()
- self._sink_task = self.create_task(self._sink_task_handler())
- if not self._sink_clock_task:
- self._sink_clock_queue = asyncio.PriorityQueue()
- self._sink_clock_task = self.create_task(self._sink_clock_task_handler())
+ async def handle_mixer_control_frame(self, frame: MixerControlFrame):
+ if self._mixer:
+ await self._mixer.process_frame(frame)
- async def _cancel_sink_tasks(self):
- # Stop sink tasks.
- if self._sink_task:
- await self.cancel_task(self._sink_task)
- self._sink_task = None
- # Stop sink clock tasks.
- if self._sink_clock_task:
- await self.cancel_task(self._sink_clock_task)
- self._sink_clock_task = None
+ #
+ # Audio handling
+ #
- async def _sink_frame_handler(self, frame: Frame):
- if isinstance(frame, OutputImageRawFrame):
- await self._set_video_image(frame)
- elif isinstance(frame, SpriteFrame):
- await self._set_video_images(frame.images)
- elif isinstance(frame, TransportMessageFrame):
- await self.send_message(frame)
+ def _create_audio_task(self):
+ if not self._audio_task and self._params.audio_out_enabled:
+ self._audio_queue = asyncio.Queue()
+ self._audio_task = self._transport.create_task(self._audio_task_handler())
- async def _sink_clock_task_handler(self):
- running = True
- while running:
- try:
- timestamp, _, frame = await self._sink_clock_queue.get()
+ async def _cancel_audio_task(self):
+ if self._audio_task:
+ await self._transport.cancel_task(self._audio_task)
+ self._audio_task = None
+
+ async def _bot_started_speaking(self):
+ if not self._bot_speaking:
+ logger.debug(f"Bot [{self._destination}] started speaking")
+
+ downstream_frame = BotStartedSpeakingFrame()
+ downstream_frame.transport_destination = self._destination
+ upstream_frame = BotStartedSpeakingFrame()
+ upstream_frame.transport_destination = self._destination
+ await self._transport.push_frame(downstream_frame)
+ await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
+
+ self._bot_speaking = True
+
+ async def _bot_stopped_speaking(self):
+ if self._bot_speaking:
+ logger.debug(f"Bot [{self._destination}] stopped speaking")
+
+ downstream_frame = BotStoppedSpeakingFrame()
+ downstream_frame.transport_destination = self._destination
+ upstream_frame = BotStoppedSpeakingFrame()
+ upstream_frame.transport_destination = self._destination
+ await self._transport.push_frame(downstream_frame)
+ await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
+
+ self._bot_speaking = False
+
+ # Clean audio buffer (there could be tiny left overs if not multiple
+ # to our output chunk size).
+ self._audio_buffer = bytearray()
+
+ async def _handle_frame(self, frame: Frame):
+ if isinstance(frame, OutputImageRawFrame):
+ await self._set_video_image(frame)
+ elif isinstance(frame, SpriteFrame):
+ await self._set_video_images(frame.images)
+ elif isinstance(frame, TransportMessageFrame):
+ await self._transport.send_message(frame)
+
+ def _next_frame(self) -> AsyncGenerator[Frame, None]:
+ async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
+ while True:
+ try:
+ frame = await asyncio.wait_for(
+ self._audio_queue.get(), timeout=vad_stop_secs
+ )
+ yield frame
+ except asyncio.TimeoutError:
+ # Notify the bot stopped speaking upstream if necessary.
+ await self._bot_stopped_speaking()
+
+ async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
+ last_frame_time = 0
+ silence = b"\x00" * self._audio_chunk_size
+ while True:
+ try:
+ frame = self._audio_queue.get_nowait()
+ if isinstance(frame, OutputAudioRawFrame):
+ frame.audio = await self._mixer.mix(frame.audio)
+ last_frame_time = time.time()
+ yield frame
+ except asyncio.QueueEmpty:
+ # Notify the bot stopped speaking upstream if necessary.
+ diff_time = time.time() - last_frame_time
+ if diff_time > vad_stop_secs:
+ await self._bot_stopped_speaking()
+ # Generate an audio frame with only the mixer's part.
+ frame = OutputAudioRawFrame(
+ audio=await self._mixer.mix(silence),
+ sample_rate=self._sample_rate,
+ num_channels=self._params.audio_out_channels,
+ )
+ yield frame
+
+ if self._mixer:
+ return with_mixer(BOT_VAD_STOP_SECS)
+ else:
+ return without_mixer(BOT_VAD_STOP_SECS)
+
+ async def _audio_task_handler(self):
+ # Push a BotSpeakingFrame every 200ms, we don't really need to push it
+ # at every audio chunk. If the audio chunk is bigger than 200ms, push at
+ # every audio chunk.
+ TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
+ BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
+ bot_speaking_counter = 0
+ async for frame in self._next_frame():
+ # Notify the bot started speaking upstream if necessary and that
+ # it's actually speaking.
+ if isinstance(frame, TTSAudioRawFrame):
+ await self._bot_started_speaking()
+ if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
+ await self._transport.push_frame(BotSpeakingFrame())
+ await self._transport.push_frame(
+ BotSpeakingFrame(), FrameDirection.UPSTREAM
+ )
+ bot_speaking_counter = 0
+ bot_speaking_counter += 1
+
+ # No need to push EndFrame, it's pushed from process_frame().
+ if isinstance(frame, EndFrame):
+ break
+
+ # Handle frame.
+ await self._handle_frame(frame)
+
+ # Also, push frame downstream in case anyone else needs it.
+ await self._transport.push_frame(frame)
+
+ # Send audio.
+ if isinstance(frame, OutputAudioRawFrame):
+ await self._transport.write_raw_audio_frames(frame.audio, self._destination)
+
+ #
+ # Video handling
+ #
+
+ def _create_video_task(self):
+ if not self._video_task and self._params.video_out_enabled:
+ self._video_queue = asyncio.Queue()
+ self._video_task = self._transport.create_task(self._video_task_handler())
+
+ async def _cancel_video_task(self):
+ # Stop video output task.
+ if self._video_task:
+ await self._transport.cancel_task(self._video_task)
+ self._video_task = None
+
+ async def _set_video_image(self, image: OutputImageRawFrame):
+ self._video_images = itertools.cycle([image])
+
+ async def _set_video_images(self, images: List[OutputImageRawFrame]):
+ self._video_images = itertools.cycle(images)
+
+ async def _video_task_handler(self):
+ self._video_start_time = None
+ self._video_frame_index = 0
+ self._video_frame_duration = 1 / self._params.video_out_framerate
+ self._video_frame_reset = self._video_frame_duration * 5
+ while True:
+ if self._params.video_out_is_live:
+ await self._video_is_live_handler()
+ elif self._video_images:
+ image = next(self._video_images)
+ await self._draw_image(image)
+ await asyncio.sleep(self._video_frame_duration)
+ else:
+ await asyncio.sleep(self._video_frame_duration)
+
+ async def _video_is_live_handler(self):
+ image = await self._video_queue.get()
+
+ # We get the start time as soon as we get the first image.
+ if not self._video_start_time:
+ self._video_start_time = time.time()
+ self._video_frame_index = 0
+
+ # Calculate how much time we need to wait before rendering next image.
+ real_elapsed_time = time.time() - self._video_start_time
+ real_render_time = self._video_frame_index * self._video_frame_duration
+ delay_time = self._video_frame_duration + real_render_time - real_elapsed_time
+
+ if abs(delay_time) > self._video_frame_reset:
+ self._video_start_time = time.time()
+ self._video_frame_index = 0
+ elif delay_time > 0:
+ await asyncio.sleep(delay_time)
+ self._video_frame_index += 1
+
+ # Render image
+ await self._draw_image(image)
+
+ self._video_queue.task_done()
+
+ async def _draw_image(self, frame: OutputImageRawFrame):
+ desired_size = (self._params.video_out_width, self._params.video_out_height)
+
+ # TODO: we should refactor in the future to support dynamic resolutions
+ # which is kind of what happens in P2P connections.
+ # We need to add support for that inside the DailyTransport
+ if frame.size != desired_size:
+ image = Image.frombytes(frame.format, frame.size, frame.image)
+ resized_image = image.resize(desired_size)
+ # logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
+ frame = OutputImageRawFrame(
+ resized_image.tobytes(), resized_image.size, resized_image.format
+ )
+
+ await self._transport.write_raw_video_frame(frame, self._destination)
+
+ #
+ # Clock handling
+ #
+
+ def _create_clock_task(self):
+ if not self._clock_task:
+ self._clock_queue = asyncio.PriorityQueue()
+ self._clock_task = self._transport.create_task(self._clock_task_handler())
+
+ async def _cancel_clock_task(self):
+ if self._clock_task:
+ await self._transport.cancel_task(self._clock_task)
+ self._clock_task = None
+
+ async def _clock_task_handler(self):
+ running = True
+ while running:
+ timestamp, _, frame = await self._clock_queue.get()
# If we hit an EndFrame, we can finish right away.
running = not isinstance(frame, EndFrame)
@@ -279,167 +593,12 @@ class BaseOutputTransport(FrameProcessor):
# has already passed we process it, otherwise we wait until it's
# time to process it.
if running:
- current_time = self.get_clock().get_time()
+ current_time = self._transport.get_clock().get_time()
if timestamp > current_time:
wait_time = nanoseconds_to_seconds(timestamp - current_time)
await asyncio.sleep(wait_time)
- # Handle frame.
- await self._sink_frame_handler(frame)
+ # Push frame downstream.
+ await self._transport.push_frame(frame)
- # Also, push frame downstream in case anyone else needs it.
- await self.push_frame(frame)
-
- self._sink_clock_queue.task_done()
- except asyncio.CancelledError:
- raise
- except Exception as e:
- logger.exception(f"{self} error processing sink clock queue: {e}")
-
- def _next_frame(self) -> AsyncGenerator[Frame, None]:
- async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
- while True:
- try:
- frame = await asyncio.wait_for(self._sink_queue.get(), timeout=vad_stop_secs)
- yield frame
- except asyncio.TimeoutError:
- # Notify the bot stopped speaking upstream if necessary.
- await self._bot_stopped_speaking()
-
- async def with_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
- last_frame_time = 0
- silence = b"\x00" * self._audio_chunk_size
- while True:
- try:
- frame = self._sink_queue.get_nowait()
- if isinstance(frame, OutputAudioRawFrame):
- frame.audio = await self._params.audio_out_mixer.mix(frame.audio)
- last_frame_time = time.time()
- yield frame
- except asyncio.QueueEmpty:
- # Notify the bot stopped speaking upstream if necessary.
- diff_time = time.time() - last_frame_time
- if diff_time > vad_stop_secs:
- await self._bot_stopped_speaking()
- # Generate an audio frame with only the mixer's part.
- frame = OutputAudioRawFrame(
- audio=await self._params.audio_out_mixer.mix(silence),
- sample_rate=self._sample_rate,
- num_channels=self._params.audio_out_channels,
- )
- yield frame
-
- if self._params.audio_out_mixer:
- return with_mixer(BOT_VAD_STOP_SECS)
- else:
- return without_mixer(BOT_VAD_STOP_SECS)
-
- async def _sink_task_handler(self):
- # Push a BotSpeakingFrame every 200ms, we don't really need to push it
- # at every audio chunk. If the audio chunk is bigger than 200ms, push at
- # every audio chunk.
- TOTAL_CHUNK_MS = self._params.audio_out_10ms_chunks * 10
- BOT_SPEAKING_CHUNK_PERIOD = max(int(200 / TOTAL_CHUNK_MS), 1)
- bot_speaking_counter = 0
- async for frame in self._next_frame():
- # Notify the bot started speaking upstream if necessary and that
- # it's actually speaking.
- if isinstance(frame, TTSAudioRawFrame):
- await self._bot_started_speaking()
- if bot_speaking_counter % BOT_SPEAKING_CHUNK_PERIOD == 0:
- await self.push_frame(BotSpeakingFrame())
- await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM)
- bot_speaking_counter = 0
- bot_speaking_counter += 1
-
- # No need to push EndFrame, it's pushed from process_frame().
- if isinstance(frame, EndFrame):
- break
-
- # Handle frame.
- await self._sink_frame_handler(frame)
-
- # Also, push frame downstream in case anyone else needs it.
- await self.push_frame(frame)
-
- # Send audio.
- if isinstance(frame, OutputAudioRawFrame):
- await self.write_raw_audio_frames(frame.audio)
-
- #
- # Video task
- #
-
- def _create_video_task(self):
- # Create video output queue and task if needed.
- if not self._video_out_task and self._params.video_out_enabled:
- self._video_out_queue = asyncio.Queue()
- self._video_out_task = self.create_task(self._video_out_task_handler())
-
- async def _cancel_video_task(self):
- # Stop video output task.
- if self._video_out_task and self._params.video_out_enabled:
- await self.cancel_task(self._video_out_task)
- self._video_out_task = None
-
- async def _draw_image(self, frame: OutputImageRawFrame):
- desired_size = (self._params.video_out_width, self._params.video_out_height)
-
- # TODO: we should refactor in the future to support dynamic resolutions
- # which is kind of what happens in P2P connections.
- # We need to add support for that inside the DailyTransport
- if frame.size != desired_size:
- image = Image.frombytes(frame.format, frame.size, frame.image)
- resized_image = image.resize(desired_size)
- # logger.warning(f"{frame} does not have the expected size {desired_size}, resizing")
- frame = OutputImageRawFrame(
- resized_image.tobytes(), resized_image.size, resized_image.format
- )
-
- await self.write_raw_video_frame(frame)
-
- async def _set_video_image(self, image: OutputImageRawFrame):
- self._video_images = itertools.cycle([image])
-
- async def _set_video_images(self, images: List[OutputImageRawFrame]):
- self._video_images = itertools.cycle(images)
-
- async def _video_out_task_handler(self):
- self._video_out_start_time = None
- self._video_out_frame_index = 0
- self._video_out_frame_duration = 1 / self._params.video_out_framerate
- self._video_out_frame_reset = self._video_out_frame_duration * 5
- while True:
- if self._params.video_out_is_live:
- await self._video_out_is_live_handler()
- elif self._video_images:
- image = next(self._video_images)
- await self._draw_image(image)
- await asyncio.sleep(self._video_out_frame_duration)
- else:
- await asyncio.sleep(self._video_out_frame_duration)
-
- async def _video_out_is_live_handler(self):
- image = await self._video_out_queue.get()
-
- # We get the start time as soon as we get the first image.
- if not self._video_out_start_time:
- self._video_out_start_time = time.time()
- self._video_out_frame_index = 0
-
- # Calculate how much time we need to wait before rendering next image.
- real_elapsed_time = time.time() - self._video_out_start_time
- real_render_time = self._video_out_frame_index * self._video_out_frame_duration
- delay_time = self._video_out_frame_duration + real_render_time - real_elapsed_time
-
- if abs(delay_time) > self._video_out_frame_reset:
- self._video_out_start_time = time.time()
- self._video_out_frame_index = 0
- elif delay_time > 0:
- await asyncio.sleep(delay_time)
- self._video_out_frame_index += 1
-
- # Render image
- await self._draw_image(image)
-
- self._video_out_queue.task_done()
+ self._clock_queue.task_done()
diff --git a/src/pipecat/transports/base_transport.py b/src/pipecat/transports/base_transport.py
index b3d537fa4..91270c365 100644
--- a/src/pipecat/transports/base_transport.py
+++ b/src/pipecat/transports/base_transport.py
@@ -5,7 +5,7 @@
#
from abc import abstractmethod
-from typing import Optional
+from typing import List, Mapping, Optional
from pydantic import BaseModel, ConfigDict
@@ -33,7 +33,8 @@ class TransportParams(BaseModel):
audio_out_channels: int = 1
audio_out_bitrate: int = 96000
audio_out_10ms_chunks: int = 4
- audio_out_mixer: Optional[BaseAudioMixer] = None
+ audio_out_mixer: Optional[BaseAudioMixer | Mapping[Optional[str], BaseAudioMixer]] = None
+ audio_out_destinations: List[str] = []
audio_in_enabled: bool = False
audio_in_sample_rate: Optional[int] = None
audio_in_channels: int = 1
@@ -48,6 +49,7 @@ class TransportParams(BaseModel):
video_out_bitrate: int = 800000
video_out_framerate: int = 30
video_out_color_format: str = "RGB"
+ video_out_destinations: List[str] = []
vad_enabled: bool = False
vad_audio_passthrough: bool = False
vad_analyzer: Optional[VADAnalyzer] = None
diff --git a/src/pipecat/transports/local/audio.py b/src/pipecat/transports/local/audio.py
index bc8d2dd16..8bfd7ee34 100644
--- a/src/pipecat/transports/local/audio.py
+++ b/src/pipecat/transports/local/audio.py
@@ -118,7 +118,7 @@ class LocalAudioOutputTransport(BaseOutputTransport):
self._out_stream.close()
self._out_stream = None
- async def write_raw_audio_frames(self, frames: bytes):
+ async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if self._out_stream:
await self.get_event_loop().run_in_executor(
self._executor, self._out_stream.write, frames
diff --git a/src/pipecat/transports/local/tk.py b/src/pipecat/transports/local/tk.py
index 1695d2cab..bed6371c2 100644
--- a/src/pipecat/transports/local/tk.py
+++ b/src/pipecat/transports/local/tk.py
@@ -131,13 +131,15 @@ class TkOutputTransport(BaseOutputTransport):
self._out_stream.close()
self._out_stream = None
- async def write_raw_audio_frames(self, frames: bytes):
+ async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if self._out_stream:
await self.get_event_loop().run_in_executor(
self._executor, self._out_stream.write, frames
)
- async def write_raw_video_frame(self, frame: OutputImageRawFrame):
+ async def write_raw_video_frame(
+ self, frame: OutputImageRawFrame, destination: Optional[str] = None
+ ):
self.get_event_loop().call_soon(self._write_frame_to_tk, frame)
def _write_frame_to_tk(self, frame: OutputImageRawFrame):
diff --git a/src/pipecat/transports/network/fastapi_websocket.py b/src/pipecat/transports/network/fastapi_websocket.py
index 047ae9c69..4a20bc49b 100644
--- a/src/pipecat/transports/network/fastapi_websocket.py
+++ b/src/pipecat/transports/network/fastapi_websocket.py
@@ -203,7 +203,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
await super().start(frame)
await self._client.setup(frame)
await self._params.serializer.setup(frame)
- self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
+ self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
async def stop(self, frame: EndFrame):
await super().stop(frame)
@@ -229,7 +229,7 @@ class FastAPIWebsocketOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._write_frame(frame)
- async def write_raw_audio_frames(self, frames: bytes):
+ async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if self._client.is_closing:
return
diff --git a/src/pipecat/transports/network/small_webrtc.py b/src/pipecat/transports/network/small_webrtc.py
index 6304d8d59..fdd501299 100644
--- a/src/pipecat/transports/network/small_webrtc.py
+++ b/src/pipecat/transports/network/small_webrtc.py
@@ -284,11 +284,13 @@ class SmallWebRTCClient:
)
yield audio_frame
- async def write_raw_audio_frames(self, data: bytes):
+ async def write_raw_audio_frames(self, data: bytes, destination: Optional[str] = None):
if self._can_send() and self._audio_output_track:
await self._audio_output_track.add_audio_bytes(data)
- async def write_raw_video_frame(self, frame: OutputImageRawFrame):
+ async def write_raw_video_frame(
+ self, frame: OutputImageRawFrame, destination: Optional[str] = None
+ ):
if self._can_send() and self._video_output_track:
self._video_output_track.add_video_frame(frame)
@@ -497,10 +499,12 @@ class SmallWebRTCOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._client.send_message(frame)
- async def write_raw_audio_frames(self, frames: bytes):
+ async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
await self._client.write_raw_audio_frames(frames)
- async def write_raw_video_frame(self, frame: OutputImageRawFrame):
+ async def write_raw_video_frame(
+ self, frame: OutputImageRawFrame, destination: Optional[str] = None
+ ):
await self._client.write_raw_video_frame(frame)
diff --git a/src/pipecat/transports/network/websocket_client.py b/src/pipecat/transports/network/websocket_client.py
index e45a525fd..7e9725a76 100644
--- a/src/pipecat/transports/network/websocket_client.py
+++ b/src/pipecat/transports/network/websocket_client.py
@@ -182,7 +182,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
async def start(self, frame: StartFrame):
await super().start(frame)
- self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
+ self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
await self._params.serializer.setup(frame)
await self._session.setup(frame)
await self._session.connect()
@@ -202,7 +202,7 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._write_frame(frame)
- async def write_raw_audio_frames(self, frames: bytes):
+ async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
frame = OutputAudioRawFrame(
audio=frames,
sample_rate=self.sample_rate,
diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py
index 7ca395922..b930f9fd6 100644
--- a/src/pipecat/transports/network/websocket_server.py
+++ b/src/pipecat/transports/network/websocket_server.py
@@ -194,7 +194,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
async def start(self, frame: StartFrame):
await super().start(frame)
await self._params.serializer.setup(frame)
- self._send_interval = (self._audio_chunk_size / self.sample_rate) / 2
+ self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
async def stop(self, frame: EndFrame):
await super().stop(frame)
@@ -218,7 +218,7 @@ class WebsocketServerOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._write_frame(frame)
- async def write_raw_audio_frames(self, frames: bytes):
+ async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
if not self._websocket:
# Simulate audio playback with a sleep.
await self._write_audio_sleep()
diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py
index 23aef56ac..08a2a6030 100644
--- a/src/pipecat/transports/services/daily.py
+++ b/src/pipecat/transports/services/daily.py
@@ -8,10 +8,13 @@ import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
-from typing import Any, Awaitable, Callable, Mapping, Optional
+from typing import Any, Awaitable, Callable, Dict, Mapping, Optional
import aiohttp
from daily import (
+ AudioData,
+ CustomAudioSource,
+ VideoFrame,
VirtualCameraDevice,
VirtualMicrophoneDevice,
VirtualSpeakerDevice,
@@ -19,6 +22,7 @@ from daily import (
from loguru import logger
from pydantic import BaseModel
+from pipecat.audio.utils import create_default_resampler
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
from pipecat.frames.frames import (
CancelFrame,
@@ -34,6 +38,7 @@ from pipecat.frames.frames import (
TranscriptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
+ UserAudioRawFrame,
UserImageRawFrame,
UserImageRequestFrame,
)
@@ -149,6 +154,8 @@ class DailyParams(TransportParams):
api_url: Daily API base URL
api_key: Daily API authentication key
dialin_settings: Optional settings for dial-in functionality
+ camera_out_enabled: Whether to enable the main camera output track. If enabled, it still needs `video_out_enabled=True`
+ microphone_out_enabled: Whether to enable the main microphone track. If enabled, it still needs `audio_out_enabled=True`
transcription_enabled: Whether to enable speech transcription
transcription_settings: Configuration for transcription service
"""
@@ -156,6 +163,8 @@ class DailyParams(TransportParams):
api_url: str = "https://api.daily.co/v1"
api_key: str = ""
dialin_settings: Optional[DailyDialinSettings] = None
+ camera_out_enabled: bool = True
+ microphone_out_enabled: bool = True
transcription_enabled: bool = False
transcription_settings: DailyTranscriptionSettings = DailyTranscriptionSettings()
@@ -275,6 +284,7 @@ class DailyTransportClient(EventHandler):
self._transport_name = transport_name
self._participant_id: str = ""
+ self._audio_renderers = {}
self._video_renderers = {}
self._transcription_ids = []
self._transcription_status = None
@@ -310,6 +320,7 @@ class DailyTransportClient(EventHandler):
self._camera: Optional[VirtualCameraDevice] = None
self._mic: Optional[VirtualMicrophoneDevice] = None
self._speaker: Optional[VirtualSpeakerDevice] = None
+ self._audio_sources: Dict[str, CustomAudioSource] = {}
def _camera_name(self):
return f"camera-{self}"
@@ -328,6 +339,14 @@ class DailyTransportClient(EventHandler):
def participant_id(self) -> str:
return self._participant_id
+ @property
+ def in_sample_rate(self) -> int:
+ return self._in_sample_rate
+
+ @property
+ def out_sample_rate(self) -> int:
+ return self._out_sample_rate
+
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
if not self._joined:
return
@@ -365,19 +384,27 @@ class DailyTransportClient(EventHandler):
await asyncio.sleep(0.01)
return None
- async def write_raw_audio_frames(self, frames: bytes):
- if not self._mic:
- return None
+ async def register_audio_destination(self, destination: str):
+ self._audio_sources[destination] = await self.add_custom_audio_track(destination)
+ self._client.update_publishing({"customAudio": {destination: True}})
+ async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
future = self._get_event_loop().create_future()
- self._mic.write_frames(frames, completion=completion_callback(future))
+ if not destination and self._mic:
+ self._mic.write_frames(frames, completion=completion_callback(future))
+ elif destination and destination in self._audio_sources:
+ source = self._audio_sources[destination]
+ source.write_frames(frames, completion=completion_callback(future))
+ else:
+ logger.warning(f"{self} unable to write audio frames to destination [{destination}]")
+ future.set_result(None)
await future
- async def write_raw_video_frame(self, frame: OutputImageRawFrame):
- if not self._camera:
- return None
-
- self._camera.write_frame(frame.image)
+ async def write_raw_video_frame(
+ self, frame: OutputImageRawFrame, destination: Optional[str] = None
+ ):
+ if not destination and self._camera:
+ self._camera.write_frame(frame.image)
async def setup(self, frame: StartFrame):
self._in_sample_rate = self._params.audio_in_sample_rate or frame.audio_in_sample_rate
@@ -480,6 +507,9 @@ class DailyTransportClient(EventHandler):
async def _join(self):
future = self._get_event_loop().create_future()
+ camera_enabled = self._params.video_out_enabled and self._params.camera_out_enabled
+ microphone_enabled = self._params.audio_out_enabled and self._params.microphone_out_enabled
+
self._client.join(
self._room_url,
self._token,
@@ -487,13 +517,13 @@ class DailyTransportClient(EventHandler):
client_settings={
"inputs": {
"camera": {
- "isEnabled": self._params.video_out_enabled,
+ "isEnabled": camera_enabled,
"settings": {
"deviceId": self._camera_name(),
},
},
"microphone": {
- "isEnabled": self._params.audio_out_enabled,
+ "isEnabled": microphone_enabled,
"settings": {
"deviceId": self._mic_name(),
"customConstraints": {
@@ -648,6 +678,28 @@ class DailyTransportClient(EventHandler):
if self._joined and self._transcription_status:
await self.update_transcription(self._transcription_ids)
+ async def capture_participant_audio(
+ self,
+ participant_id: str,
+ callback: Callable,
+ audio_source: str = "microphone",
+ ):
+ # Only enable the desired audio source subscription on this participant.
+ if audio_source in ("microphone", "screenAudio"):
+ media = {"media": {audio_source: "subscribed"}}
+ else:
+ media = {"media": {"customAudio": {audio_source: "subscribed"}}}
+
+ await self.update_subscriptions(participant_settings={participant_id: media})
+
+ self._audio_renderers[participant_id] = {audio_source: callback}
+
+ self._client.set_audio_renderer(
+ participant_id,
+ self._audio_data_received,
+ audio_source=audio_source,
+ )
+
async def capture_participant_video(
self,
participant_id: str,
@@ -656,12 +708,15 @@ class DailyTransportClient(EventHandler):
video_source: str = "camera",
color_format: str = "RGB",
):
- # Only enable the desired video source subscription on this participant.
- await self.update_subscriptions(
- participant_settings={participant_id: {"media": {video_source: "subscribed"}}}
- )
+ # Only enable the desired audio source subscription on this participant.
+ if video_source in ("camera", "screenVideo"):
+ media = {"media": {video_source: "subscribed"}}
+ else:
+ media = {"media": {"customVideo": {video_source: "subscribed"}}}
- self._video_renderers[participant_id] = callback
+ await self.update_subscriptions(participant_settings={participant_id: media})
+
+ self._video_renderers[participant_id] = {video_source: callback}
self._client.set_video_renderer(
participant_id,
@@ -670,6 +725,20 @@ class DailyTransportClient(EventHandler):
color_format=color_format,
)
+ async def add_custom_audio_track(self, track_name: str) -> CustomAudioSource:
+ future = self._get_event_loop().create_future()
+
+ audio_source = CustomAudioSource(self._out_sample_rate, 1)
+ self._client.add_custom_audio_track(
+ track_name=track_name,
+ audio_source=audio_source,
+ completion=completion_callback(future),
+ )
+
+ await future
+
+ return audio_source
+
async def update_transcription(self, participants=None, instance_id=None):
future = self._get_event_loop().create_future()
self._client.update_transcription(
@@ -686,7 +755,15 @@ class DailyTransportClient(EventHandler):
)
await future
- async def update_remote_participants(self, remote_participants: Mapping[str, Any] = None):
+ async def update_publishing(self, publishing_settings: Mapping[str, Any]):
+ future = self._get_event_loop().create_future()
+ self._client.update_publishing(
+ publishing_settings=publishing_settings,
+ completion=completion_callback(future),
+ )
+ await future
+
+ async def update_remote_participants(self, remote_participants: Mapping[str, Any]):
future = self._get_event_loop().create_future()
self._client.update_remote_participants(
remote_participants=remote_participants, completion=completion_callback(future)
@@ -773,15 +850,15 @@ class DailyTransportClient(EventHandler):
# Daily (CallClient callbacks)
#
- def _video_frame_received(self, participant_id, video_frame):
- callback = self._video_renderers[participant_id]
- self._call_async_callback(
- callback,
- participant_id,
- video_frame.buffer,
- (video_frame.width, video_frame.height),
- video_frame.color_format,
- )
+ def _audio_data_received(self, participant_id: str, audio_data: AudioData, audio_source: str):
+ callback = self._audio_renderers[participant_id][audio_source]
+ self._call_async_callback(callback, participant_id, audio_data, audio_source)
+
+ def _video_frame_received(
+ self, participant_id: str, video_frame: VideoFrame, video_source: str
+ ):
+ callback = self._video_renderers[participant_id][video_source]
+ self._call_async_callback(callback, participant_id, video_frame, video_source)
def _call_async_callback(self, callback, *args):
future = asyncio.run_coroutine_threadsafe(
@@ -837,6 +914,8 @@ class DailyInputTransport(BaseInputTransport):
# internally to be processed.
self._audio_in_task = None
+ self._resampler = create_default_resampler()
+
self._vad_analyzer: Optional[VADAnalyzer] = params.vad_analyzer
@property
@@ -851,6 +930,9 @@ class DailyInputTransport(BaseInputTransport):
self._audio_in_task = self.create_task(self._audio_in_task_handler())
async def start(self, frame: StartFrame):
+ # Setup client.
+ await self._client.setup(frame)
+
# Parent start.
await super().start(frame)
@@ -859,8 +941,6 @@ class DailyInputTransport(BaseInputTransport):
self._initialized = True
- # Setup client.
- await self._client.setup(frame)
# Join the room.
await self._client.join()
if self._params.audio_in_stream_on_start:
@@ -916,6 +996,31 @@ class DailyInputTransport(BaseInputTransport):
# Audio in
#
+ async def capture_participant_audio(
+ self,
+ participant_id: str,
+ audio_source: str = "camera",
+ ):
+ await self._client.capture_participant_audio(
+ participant_id, self._on_participant_audio_data, audio_source
+ )
+
+ async def _on_participant_audio_data(
+ self, participant_id: str, audio: AudioData, audio_source: str
+ ):
+ resampled = await self._resampler.resample(
+ audio.audio_frames, audio.sample_rate, self._client.out_sample_rate
+ )
+
+ frame = UserAudioRawFrame(
+ user_id=participant_id,
+ audio=resampled,
+ sample_rate=self._client.out_sample_rate,
+ num_channels=audio.num_channels,
+ )
+ frame.transport_source = audio_source
+ await self.push_frame(frame)
+
async def _audio_in_task_handler(self):
while True:
frame = await self._client.read_next_audio_frame()
@@ -934,9 +1039,11 @@ class DailyInputTransport(BaseInputTransport):
color_format: str = "RGB",
):
self._video_renderers[participant_id] = {
- "framerate": framerate,
- "timestamp": 0,
- "render_next_frame": [],
+ video_source: {
+ "framerate": framerate,
+ "timestamp": 0,
+ "render_next_frame": [],
+ }
}
await self._client.capture_participant_video(
@@ -947,12 +1054,14 @@ class DailyInputTransport(BaseInputTransport):
if frame.user_id in self._video_renderers:
self._video_renderers[frame.user_id]["render_next_frame"].append(frame)
- async def _on_participant_video_frame(self, participant_id: str, buffer, size, format):
+ async def _on_participant_video_frame(
+ self, participant_id: str, video_frame: VideoFrame, video_source: str
+ ):
render_frame = False
curr_time = time.time()
- prev_time = self._video_renderers[participant_id]["timestamp"]
- framerate = self._video_renderers[participant_id]["framerate"]
+ prev_time = self._video_renderers[participant_id][video_source]["timestamp"]
+ framerate = self._video_renderers[participant_id][video_source]["framerate"]
# Some times we render frames because of a request.
request_frame = None
@@ -961,20 +1070,23 @@ class DailyInputTransport(BaseInputTransport):
next_time = prev_time + 1 / framerate
render_frame = (next_time - curr_time) < 0.1
- elif self._video_renderers[participant_id]["render_next_frame"]:
- request_frame = self._video_renderers[participant_id]["render_next_frame"].pop(0)
+ elif self._video_renderers[participant_id][video_source]["render_next_frame"]:
+ request_frame = self._video_renderers[participant_id][video_source][
+ "render_next_frame"
+ ].pop(0)
render_frame = True
if render_frame:
frame = UserImageRawFrame(
user_id=participant_id,
request=request_frame,
- image=buffer,
- size=size,
- format=format,
+ image=video_frame.buffer,
+ size=(video_frame.width, video_frame.height),
+ format=video_frame.color_format,
)
+ frame.transport_source = video_source
await self.push_frame(frame)
- self._video_renderers[participant_id]["timestamp"] = curr_time
+ self._video_renderers[participant_id][video_source]["timestamp"] = curr_time
class DailyOutputTransport(BaseOutputTransport):
@@ -999,6 +1111,9 @@ class DailyOutputTransport(BaseOutputTransport):
self._initialized = False
async def start(self, frame: StartFrame):
+ # Setup client.
+ await self._client.setup(frame)
+
# Parent start.
await super().start(frame)
@@ -1007,8 +1122,6 @@ class DailyOutputTransport(BaseOutputTransport):
self._initialized = True
- # Setup client.
- await self._client.setup(frame)
# Join the room.
await self._client.join()
@@ -1032,11 +1145,19 @@ class DailyOutputTransport(BaseOutputTransport):
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
await self._client.send_message(frame)
- async def write_raw_audio_frames(self, frames: bytes):
- await self._client.write_raw_audio_frames(frames)
+ async def register_video_destination(self, destination: str):
+ logger.warning(f"{self} registering video destinations is not supported yet")
- async def write_raw_video_frame(self, frame: OutputImageRawFrame):
- await self._client.write_raw_video_frame(frame)
+ async def register_audio_destination(self, destination: str):
+ await self._client.register_audio_destination(destination)
+
+ async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
+ await self._client.write_raw_audio_frames(frames, destination)
+
+ async def write_raw_video_frame(
+ self, frame: OutputImageRawFrame, destination: Optional[str] = None
+ ):
+ await self._client.write_raw_video_frame(frame, destination)
class DailyTransport(BaseTransport):
@@ -1204,6 +1325,14 @@ class DailyTransport(BaseTransport):
async def capture_participant_transcription(self, participant_id: str):
await self._client.capture_participant_transcription(participant_id)
+ async def capture_participant_audio(
+ self,
+ participant_id: str,
+ audio_source: str = "microphone",
+ ):
+ if self._input:
+ await self._input.capture_participant_audio(participant_id, audio_source)
+
async def capture_participant_video(
self,
participant_id: str,
@@ -1216,12 +1345,15 @@ class DailyTransport(BaseTransport):
participant_id, framerate, video_source, color_format
)
+ async def update_publishing(self, publishing_settings: Mapping[str, Any]):
+ await self._client.update_publishing(publishing_settings=publishing_settings)
+
async def update_subscriptions(self, participant_settings=None, profile_settings=None):
await self._client.update_subscriptions(
participant_settings=participant_settings, profile_settings=profile_settings
)
- async def update_remote_participants(self, remote_participants: Mapping[str, Any] = None):
+ async def update_remote_participants(self, remote_participants: Mapping[str, Any]):
await self._client.update_remote_participants(remote_participants=remote_participants)
async def _on_joined(self, data):
diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py
index 2e56ebddf..456a70ea6 100644
--- a/src/pipecat/transports/services/livekit.py
+++ b/src/pipecat/transports/services/livekit.py
@@ -462,7 +462,7 @@ class LiveKitOutputTransport(BaseOutputTransport):
else:
await self._client.send_data(frame.message.encode())
- async def write_raw_audio_frames(self, frames: bytes):
+ async def write_raw_audio_frames(self, frames: bytes, destination: Optional[str] = None):
livekit_audio = self._convert_pipecat_audio_to_livekit(frames)
await self._client.publish_audio(livekit_audio)