diff --git a/examples/foundational/01a-local-audio.py b/examples/foundational/01a-local-audio.py index ba593cc04..633697684 100644 --- a/examples/foundational/01a-local-audio.py +++ b/examples/foundational/01a-local-audio.py @@ -16,8 +16,7 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.services.cartesia import CartesiaTTSService -from pipecat.transports.base_transport import TransportParams -from pipecat.transports.local.audio import LocalAudioTransport +from pipecat.transports.local.audio import LocalAudioTransport, LocalTransportParams load_dotenv(override=True) @@ -26,7 +25,7 @@ logger.add(sys.stderr, level="DEBUG") async def main(): - transport = LocalAudioTransport(TransportParams(audio_out_enabled=True)) + transport = LocalAudioTransport(LocalTransportParams(audio_out_enabled=True)) tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), @@ -41,7 +40,7 @@ async def main(): await asyncio.sleep(1) await task.queue_frames([TTSSpeakFrame("Hello there, how is it going!"), EndFrame()]) - runner = PipelineRunner() + runner = PipelineRunner(handle_sigint=False if sys.platform == "win32" else True) await asyncio.gather(runner.run(task), say_something()) diff --git a/examples/foundational/13a-whisper-local.py b/examples/foundational/13a-whisper-local.py index 7e2e14887..2d3cd5a57 100644 --- a/examples/foundational/13a-whisper-local.py +++ b/examples/foundational/13a-whisper-local.py @@ -16,8 +16,7 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.whisper import WhisperSTTService -from pipecat.transports.base_transport import TransportParams -from pipecat.transports.local.audio import LocalAudioTransport +from pipecat.transports.local.audio import LocalAudioTransport, LocalTransportParams load_dotenv(override=True) @@ -34,7 +33,7 @@ class TranscriptionLogger(FrameProcessor): async def main(): - transport = LocalAudioTransport(TransportParams(audio_in_enabled=True)) + transport = LocalAudioTransport(LocalTransportParams(audio_in_enabled=True)) stt = WhisperSTTService() @@ -44,7 +43,7 @@ async def main(): task = PipelineTask(pipeline) - runner = PipelineRunner() + runner = PipelineRunner(handle_sigint=False if sys.platform == "win32" else True) await runner.run(task) diff --git a/examples/local-input-select-stt/README.md b/examples/local-input-select-stt/README.md new file mode 100644 index 000000000..c5aa58b35 --- /dev/null +++ b/examples/local-input-select-stt/README.md @@ -0,0 +1,88 @@ + +# Pipecat Audio Transcription Example ๐Ÿš€๐ŸŽ™๏ธ + +Welcome to the **Pipecat Audio Transcription Example**! + +This project showcases how to integrate the awesome [pipecat](https://github.com/pipecat-ai/pipecat) library with a neat textual interface (powered by [Textual](https://github.com/Textualize/textual)) to select audio devices, perform real-time speech-to-text (STT) transcription using [Whisper](https://github.com/openai/whisper). + +> **Note:** Although the script allows you to select both input and output audio devices, this example only utilizes the audio **input** for transcription. + +--- + +## ๐ŸŽ‰ Features + +- **Interactive Audio Device Selection:** + Choose your preferred audio input device using a cool, textual UI. + +- **State-of-the-Art Transcription:** + Leverage Whisper's large model (running on CUDA) for high-quality, real-time STT. + +- **Live Transcription Logging:** + Watch your spoken words transform into text on your console instantly. + +- **Easy Setup:** + Everything you need is in the [`requirements.txt`](./requirements.txt). + +--- + +## ๐ŸŽฅ Demo + +Get a quick glimpse of the app in action! +*(Don't worry โ€“ I'll be adding a GIF demo here soon!)* + +![Demo GIF](demo.gif) + +--- + +## ๐Ÿ”ง Installation + + +Install Dependencies: + +```bash +pip install -r requirements.txt +``` + + +--- + +## ๐Ÿš€ Usage + +Run the main script: + +```bash +python bot.py +``` + +When the app launches, you'll see a textual interface that lets you select your audio input device. Once selected, the app will begin capturing audio, transcribing it using Whisper. + +--- + +## โš™๏ธ How It Works + + +1. **LocalAudioTransport:** + Captures audio from your chosen input device. + +2. **WhisperSTTService:** + Processes the audio stream using Whisper's large model for speech-to-text conversion. + +3. **TranscriptionLogger:** + Logs the transcribed text to the console as soon as it's processed. + + +--- + +## ๐Ÿ“ฆ Dependencies + +The project relies on: + +- [pipecat](https://github.com/yourusername/pipecat) โ€“ For building the audio processing pipeline. +- [Textual](https://github.com/Textualize/textual) โ€“ For the interactive terminal UI. +- [Whisper](https://github.com/openai/whisper) โ€“ For state-of-the-art STT transcription. + +--- + +## Example improvements: + +I plan to improve this example with local LLM calls and audio output. diff --git a/examples/local-input-select-stt/bot.py b/examples/local-input-select-stt/bot.py new file mode 100644 index 000000000..1cedbf96c --- /dev/null +++ b/examples/local-input-select-stt/bot.py @@ -0,0 +1,65 @@ +# +# Copyright (c) 2024โ€“2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import sys +from typing import Tuple + +from dotenv import load_dotenv +from loguru import logger +from select_audio_device import AudioDevice, run_device_selector + +from pipecat.frames.frames import Frame, TranscriptionFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.whisper import Model, WhisperSTTService +from pipecat.transports.local.audio import LocalAudioTransport, LocalTransportParams + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +class TranscriptionLogger(FrameProcessor): + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, TranscriptionFrame): + print(f"Transcription: {frame.text}") + + +async def main(input_device: int, output_device: int): + transport = LocalAudioTransport( + LocalTransportParams( + audio_in_enabled=True, + audio_out_enabled=False, + input_device_index=input_device, + output_device_index=output_device, + ) + ) + + stt = WhisperSTTService(device="cuda", model=Model.LARGE, no_speech_prob=0.3) + + tl = TranscriptionLogger() + + pipeline = Pipeline([transport.input(), stt, tl]) + + task = PipelineTask(pipeline) + + runner = PipelineRunner(handle_sigint=False if sys.platform == "win32" else True) + + await asyncio.gather(runner.run(task)) + + +if __name__ == "__main__": + res: Tuple[AudioDevice, AudioDevice, int] = asyncio.run( + run_device_selector() # runs the textual app that allows to select input device + ) + + asyncio.run(main(res[0].index, res[1].index)) diff --git a/examples/local-input-select-stt/demo.gif b/examples/local-input-select-stt/demo.gif new file mode 100644 index 000000000..f4beb2cb7 Binary files /dev/null and b/examples/local-input-select-stt/demo.gif differ diff --git a/examples/local-input-select-stt/requirements.txt b/examples/local-input-select-stt/requirements.txt new file mode 100644 index 000000000..9e2f3e592 --- /dev/null +++ b/examples/local-input-select-stt/requirements.txt @@ -0,0 +1,8 @@ +--extra-index-url https://download.pytorch.org/whl/cu124 +torch==2.5.0+cu124 +torchvision +torchaudio +pipecat[whisper, openai] +textual==1.0.0 +pydantic-settings==2.7.1 +pyaudio==0.2.14 diff --git a/examples/local-input-select-stt/select_audio_device.py b/examples/local-input-select-stt/select_audio_device.py new file mode 100644 index 000000000..2993eeafc --- /dev/null +++ b/examples/local-input-select-stt/select_audio_device.py @@ -0,0 +1,247 @@ +from typing import List, Optional, Tuple + +import pyaudio +from pydantic import BaseModel, ConfigDict, Field +from pydantic_settings import BaseSettings +from textual.app import App, ComposeResult +from textual.containers import Container +from textual.widgets import Footer, Header, Label, ListItem, ListView, Select +from textual.widgets.option_list import Option + +# โ”€โ”€โ”€ DATA MODELS โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + + +class HostApi(BaseModel): + index: int + struct_version: int = Field(..., alias="structVersion") + type: int + name: str + device_count: int = Field(..., alias="deviceCount") + default_input_device: int = Field(..., alias="defaultInputDevice") + default_output_device: int = Field(..., alias="defaultOutputDevice") + + +class AudioDevice(BaseModel): + model_config = ConfigDict(populate_by_name=True) + index: int + struct_version: int = Field(..., alias="structVersion") + name: str + host_api: int = Field(..., alias="hostApi") + max_input_channels: int = Field(..., alias="maxInputChannels") + max_output_channels: int = Field(..., alias="maxOutputChannels") + default_low_input_latency: float = Field(..., alias="defaultLowInputLatency") + default_low_output_latency: float = Field(..., alias="defaultLowOutputLatency") + default_high_input_latency: float = Field(..., alias="defaultHighInputLatency") + default_high_output_latency: float = Field(..., alias="defaultHighOutputLatency") + default_sample_rate: float = Field(..., alias="defaultSampleRate") + + +# โ”€โ”€โ”€ SETTINGS MODEL โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + + +class AudioSettings(BaseSettings): # to save settings to a file + host_api: Optional[int] = None + input_device: Optional[AudioDevice] = None + output_device: Optional[AudioDevice] = None + + class Config: + env_file = "settings.env" # or adjust as needed + + def save_to_json(self, filepath: str) -> None: + with open(filepath, "w") as f: + f.write(self.model_dump_json(indent=2)) + + +# โ”€โ”€โ”€ TEXTUAL APP โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + + +class AudioDeviceSelectorApp(App): + CSS = """ + Screen { + align: center middle; + } + #container { + width: 80%; + border: round green; + padding: 1 2; + } + """ + + def __init__( + self, + default_host_api: Optional[int] = None, + default_input_device: Optional[AudioDevice] = None, + default_output_device: Optional[AudioDevice] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + # Save defaults passed from settings. + self.default_host_api: Optional[int] = default_host_api + self.default_input_device: Optional[AudioDevice] = default_input_device + self.default_output_device: Optional[AudioDevice] = default_output_device + + self.pyaudio_instance = pyaudio.PyAudio() + + # Static datastructures: host APIs and devices as wellโ€typed models. + self.host_apis: List[HostApi] = [] + self.current_host_api: Optional[int] = None + + self.all_input_devices: List[AudioDevice] = [] + self.all_output_devices: List[AudioDevice] = [] + self.input_devices: List[AudioDevice] = [] + self.output_devices: List[AudioDevice] = [] + + # Stage management: first select input, then output. + self.stage: str = "input" + self.selected_input_device: Optional[AudioDevice] = None + self.selected_output_device: Optional[AudioDevice] = None + host_api_count: int = self.pyaudio_instance.get_host_api_count() + for i in range(host_api_count): + raw_api = self.pyaudio_instance.get_host_api_info_by_index(i) + # Inject the index (if not already present) + raw_api["index"] = i + try: + api = HostApi.parse_obj(raw_api) + self.host_apis.append(api) + except Exception as e: + # Skip APIs that don't conform. + continue + + def compose(self) -> ComposeResult: + options: List[Tuple[str, Option]] = [ + ( + api.name, + Option( + prompt=str(api.name) if api.name else f"Host API {api.index}", + id=str(api.index), + ), + ) + for api in self.host_apis + ] + + yield Header() + + yield Footer() + with Container(id="container"): + yield Label("Select Host API:", id="host-api-label") + # Create the Select widget with no options initially. + self.host_api_select: Select[HostApi] = Select(options=options, id="host-api-select") + yield self.host_api_select + self.prompt = Label("Select Input Audio Device:", id="prompt") + yield self.prompt + self.list_view = ListView(id="device-list") + yield self.list_view + + def on_mount(self) -> None: + # Populate host APIs from PyAudio. + + # Build the dropdown options. + + self.host_api_select.refresh() # Force a redraw + + # Determine the default host API. + if self.default_host_api is not None: + self.current_host_api = self.default_host_api + else: + default_api_info = self.pyaudio_instance.get_default_host_api_info() + self.current_host_api = default_api_info["index"] + + # Delay setting the dropdown's value until the widget is fully initialized. + self.set_timer( + 0, + lambda: setattr(self.host_api_select, "value", str(self.current_host_api)), + ) + + # Load all devices and parse them into AudioDevice objects. + device_count: int = self.pyaudio_instance.get_device_count() + for i in range(device_count): + raw_device = self.pyaudio_instance.get_device_info_by_index(i) + raw_device["index"] = i + try: + device = AudioDevice.parse_obj(raw_device) + except Exception as e: + # Skip devices missing required fields. + continue + if device.max_input_channels > 0: + self.all_input_devices.append(device) + if device.max_output_channels > 0: + self.all_output_devices.append(device) + + self.filter_devices() + self.populate_list(self.input_devices) + if self.default_input_device: + self._select_default_in_list(self.default_input_device) + + def filter_devices(self) -> None: + """Filter devices based on the selected host API.""" + self.input_devices = [ + d for d in self.all_input_devices if d.host_api == self.current_host_api + ] + self.output_devices = [ + d for d in self.all_output_devices if d.host_api == self.current_host_api + ] + + def populate_list(self, devices: List[AudioDevice]) -> None: + """Populate the ListView with a list of AudioDevice objects.""" + self.list_view.clear() + for dev in devices: + item_text: str = f"{dev.name} (Index: {dev.index})" + item = ListItem(Label(item_text)) + # Attach the AudioDevice instance to the widget. + item.device_info = dev # type: ignore + self.list_view.append(item) + + def _select_default_in_list(self, default_device: AudioDevice) -> None: + """Pre-select the default device if present in the current list.""" + for idx, item in enumerate(self.list_view.children): + if hasattr(item, "device_info") and item.device_info.index == default_device.index: + self.list_view.index = idx + break + + async def on_select_changed(self, event: Select.Changed) -> None: + """Handle changes in the host API dropdown.""" + if event.select.id == "host-api-select": + self.current_host_api = int(event.value.id) + self.filter_devices() + if self.stage == "input": + self.populate_list(self.input_devices) + if self.default_input_device: + self._select_default_in_list(self.default_input_device) + elif self.stage == "output": + self.populate_list(self.output_devices) + if self.default_output_device: + self._select_default_in_list(self.default_output_device) + + async def on_list_view_selected(self, message: ListView.Selected) -> None: + """Record device selection and switch stages.""" + selected_item = message.item + device_info: AudioDevice = selected_item.device_info # type: ignore + if self.stage == "input": + self.selected_input_device = device_info + self.stage = "output" + self.prompt.update("Select Output Audio Device:") + self.populate_list(self.output_devices) + if self.default_output_device: + self._select_default_in_list(self.default_output_device) + elif self.stage == "output": + self.selected_output_device = device_info + await self.action_quit() + + +# โ”€โ”€โ”€ HELPER FUNCTIONS โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + + +async def run_device_selector( + default_host_api: Optional[int] = None, + default_input_device: Optional[AudioDevice] = None, + default_output_device: Optional[AudioDevice] = None, +) -> Tuple[AudioDevice, AudioDevice, int]: + app = AudioDeviceSelectorApp( + default_host_api=default_host_api, + default_input_device=default_input_device, + default_output_device=default_output_device, + ) + await app.run_async() + + # The current_host_api is guaranteed to be set. + return app.selected_input_device, app.selected_output_device, app.current_host_api # type: ignore diff --git a/src/pipecat/transports/local/audio.py b/src/pipecat/transports/local/audio.py index 9827283bc..30bdaaf79 100644 --- a/src/pipecat/transports/local/audio.py +++ b/src/pipecat/transports/local/audio.py @@ -116,7 +116,7 @@ class LocalAudioOutputTransport(BaseOutputTransport): ) -class LocalAudioTransport(LocalTransportParams): +class LocalAudioTransport(BaseTransport): def __init__(self, params: LocalTransportParams): super().__init__() self._params = params