From fd5371256752464bf037c821a244f3590b57d7a0 Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 23 May 2025 17:32:12 +0800 Subject: [PATCH 01/14] add models for new streaming service --- src/pipecat/services/assemblyai/models.py | 61 +++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 src/pipecat/services/assemblyai/models.py diff --git a/src/pipecat/services/assemblyai/models.py b/src/pipecat/services/assemblyai/models.py new file mode 100644 index 000000000..6a0179c3d --- /dev/null +++ b/src/pipecat/services/assemblyai/models.py @@ -0,0 +1,61 @@ +from typing import List, Literal, Optional + +from pydantic import BaseModel, Field + + +class Word(BaseModel): + """Represents a single word in a transcription with timing and confidence.""" + + start: int + end: int + text: str + confidence: float + word_is_final: bool = Field(..., alias="word_is_final") + + +class BaseMessage(BaseModel): + """Base class for all AssemblyAI WebSocket messages.""" + + type: str + + +class BeginMessage(BaseMessage): + """Message sent when a new session begins.""" + + type: Literal["Begin"] = "Begin" + id: str + expires_at: int + + +class TurnMessage(BaseMessage): + """Message containing transcription data for a turn of speech.""" + + type: Literal["Turn"] = "Turn" + turn_order: int + turn_is_formatted: bool + end_of_turn: bool + transcript: str + end_of_turn_confidence: float + words: List[Word] + + +class TerminationMessage(BaseMessage): + """Message sent when the session is terminated.""" + + type: Literal["Termination"] = "Termination" + audio_duration_seconds: float + session_duration_seconds: float + + +# Union type for all possible message types +AnyMessage = BeginMessage | TurnMessage | TerminationMessage + + +class AssemblyAIConnectionParams(BaseModel): + sample_rate: int = 16000 + encoding: Literal["pcm_s16le", "pcm_mulaw"] = "pcm_s16le" + formatted_finals: bool = False + word_finalization_max_wait_time: Optional[int] = None + end_of_turn_confidence_threshold: Optional[float] = None + min_end_of_turn_silence_when_confident: Optional[int] = None + max_turn_silence: Optional[int] = None From 9b8800ac1d80fa9be378756b2b83bd4f9095e353 Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 23 May 2025 17:32:31 +0800 Subject: [PATCH 02/14] update stt.py --- src/pipecat/services/assemblyai/stt.py | 283 ++++++++++++++++--------- 1 file changed, 178 insertions(+), 105 deletions(-) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 2153d969e..3dfd9a3f3 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -5,30 +5,40 @@ # import asyncio -from typing import AsyncGenerator, Optional +import json +from typing import Any, AsyncGenerator, Dict +from urllib.parse import urlencode from loguru import logger from pipecat.frames.frames import ( CancelFrame, EndFrame, - ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, TranscriptionFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, ) +from pipecat.processors.frame_processor import FrameDirection from pipecat.services.stt_service import STTService from pipecat.transcriptions.language import Language from pipecat.utils.time import time_now_iso8601 -from pipecat.utils.tracing.service_decorators import traced_stt + +from .models import ( + AssemblyAIConnectionParams, + BaseMessage, + BeginMessage, + TerminationMessage, + TurnMessage, +) try: - import assemblyai as aai - from assemblyai import AudioEncoding + import websockets except ModuleNotFoundError as e: logger.error(f"Exception: {e}") - logger.error("In order to use AssemblyAI, you need to `pip install pipecat-ai[assemblyai]`.") + logger.error("In order to use AssemblyAI, you need to `pip install websockets`.") raise Exception(f"Missing module: {e}") @@ -37,28 +47,28 @@ class AssemblyAISTTService(STTService): self, *, api_key: str, - sample_rate: Optional[int] = None, - encoding: AudioEncoding = AudioEncoding("pcm_s16le"), - language=Language.EN, # Only English is supported for Realtime + language: Language = Language.EN, # AssemblyAI only supports English + api_endpoint_base_url: str = "wss://streaming.assemblyai.com/v3/ws", + connection_params: AssemblyAIConnectionParams = AssemblyAIConnectionParams(), **kwargs, ): - super().__init__(sample_rate=sample_rate, **kwargs) + self._api_key = api_key + self._language = language + self._api_endpoint_base_url = api_endpoint_base_url + self._connection_params = connection_params - aai.settings.api_key = api_key - self._transcriber: Optional[aai.RealtimeTranscriber] = None + super().__init__(sample_rate=self._connection_params.sample_rate, **kwargs) - self._settings = { - "encoding": encoding, - "language": language, - } + self._websocket = None + self._termination_event = asyncio.Event() + self._received_termination = False + self._connected = False + + self._receive_task = None def can_generate_metrics(self) -> bool: return True - async def set_language(self, language: Language): - logger.info(f"Switching STT language to: [{language}]") - self._settings["language"] = language - async def start(self, frame: StartFrame): await super().start(frame) await self._connect() @@ -72,97 +82,160 @@ class AssemblyAISTTService(STTService): await self._disconnect() async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: - """Process an audio chunk for STT transcription. + await self._websocket.send(audio) + yield Frame() - This method streams the audio data to AssemblyAI for real-time transcription. - Transcription results are handled asynchronously via callback functions. - - :param audio: Audio data as bytes - :yield: None (transcription frames are pushed via self.push_frame in callbacks) - """ - if self._transcriber: + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if isinstance(frame, UserStartedSpeakingFrame): await self.start_ttfb_metrics() + elif isinstance(frame, UserStoppedSpeakingFrame): + # TODO: if the user opts to use VAD, we should send a ForceEndpoint message await self.start_processing_metrics() - self._transcriber.stream(audio) - yield None - @traced_stt - async def _handle_transcription( - self, transcript: str, is_final: bool, language: Optional[Language] = None - ): - """Handle a transcription result with tracing.""" - await self.stop_ttfb_metrics() - await self.stop_processing_metrics() + def _build_ws_url(self) -> str: + """Build WebSocket URL with query parameters using urllib.parse.urlencode.""" + params = { + k: str(v).lower() if isinstance(v, bool) else v + for k, v in self._connection_params.dict().items() + if v is not None + } + if params: + query_string = urlencode(params) + return f"{self._api_endpoint_base_url}?{query_string}" + return self._api_endpoint_base_url async def _connect(self): - """Establish a connection to the AssemblyAI real-time transcription service. - - This method sets up the necessary callback functions and initializes the - AssemblyAI transcriber. - """ - if self._transcriber: - return - - def on_open(session_opened: aai.RealtimeSessionOpened): - """Callback for when the connection to AssemblyAI is opened.""" - logger.info(f"{self}: Connected to AssemblyAI") - - def on_data(transcript: aai.RealtimeTranscript): - """Callback for handling incoming transcription data. - - This function runs in a separate thread from the main asyncio event loop. - It creates appropriate transcription frames and schedules them to be - pushed to the next stage of the pipeline in the main event loop. - """ - if not transcript.text: - return - - timestamp = time_now_iso8601() - is_final = isinstance(transcript, aai.RealtimeFinalTranscript) - language = self._settings["language"] - - if is_final: - frame = TranscriptionFrame(transcript.text, "", timestamp, language) - else: - frame = InterimTranscriptionFrame(transcript.text, "", timestamp, language) - - asyncio.run_coroutine_threadsafe( - self._handle_transcription(transcript.text, is_final, language), - self.get_event_loop(), + try: + ws_url = self._build_ws_url() + self._websocket = await websockets.connect( + ws_url, + extra_headers={"Authorization": self._api_key}, ) - - # Schedule the coroutine to run in the main event loop - # This is necessary because this callback runs in a different thread - asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) - - def on_error(error: aai.RealtimeError): - """Callback for handling errors from AssemblyAI. - - Like on_data, this runs in a separate thread and schedules error - handling in the main event loop. - """ - logger.error(f"{self}: An error occurred: {error}") - # Schedule the coroutine to run in the main event loop - asyncio.run_coroutine_threadsafe( - self.push_frame(ErrorFrame(str(error))), self.get_event_loop() - ) - - def on_close(): - """Callback for when the connection to AssemblyAI is closed.""" - logger.info(f"{self}: Disconnected from AssemblyAI") - - self._transcriber = aai.RealtimeTranscriber( - sample_rate=self.sample_rate, - encoding=self._settings["encoding"], - on_data=on_data, - on_error=on_error, - on_open=on_open, - on_close=on_close, - ) - self._transcriber.connect() + self._connected = True + self._receive_task = asyncio.create_task(self._receive_task_handler()) + except Exception as e: + logger.error(f"Failed to connect to AssemblyAI: {e}") + self._connected = False + raise async def _disconnect(self): - """Disconnect from the AssemblyAI service and clean up resources.""" - if self._transcriber: - self._transcriber.close() - self._transcriber = None + """Disconnect from AssemblyAI WebSocket and wait for termination message.""" + if not self._connected or not self._websocket: + return + + try: + self._termination_event.clear() + self._received_termination = False + + try: + await self._websocket.send(json.dumps({"type": "Terminate"})) + + try: + await asyncio.wait_for( + self._termination_event.wait(), + timeout=5.0, + ) + except asyncio.TimeoutError: + logger.warning("Timed out waiting for termination message from server") + + except Exception as e: + logger.warning(f"Error during termination handshake: {e}") + + finally: + self._connected = False + + try: + if not self._websocket.closed: + await self._websocket.close() + except Exception as e: + logger.warning(f"Error closing WebSocket: {e}") + + if self._receive_task and not self._receive_task.done(): + self._receive_task.cancel() + try: + await self._receive_task + except (asyncio.CancelledError, Exception) as e: + logger.debug(f"Receive task cancelled: {e}") + + async def _receive_task_handler(self): + """Handle incoming WebSocket messages.""" + try: + while self._connected: + try: + message = await self._websocket.recv() + data = json.loads(message) + await self._handle_message(data) + except websockets.exceptions.ConnectionClosedOK: + break + except Exception as e: + logger.error(f"Error processing WebSocket message: {e}") + break + + except Exception as e: + logger.error(f"Fatal error in receive handler: {e}") + + def _parse_message(self, message: Dict[str, Any]) -> BaseMessage: + """Parse a raw message into the appropriate message type.""" + msg_type = message.get("type") + + if msg_type == "Begin": + return BeginMessage.parse_obj(message) + elif msg_type == "Turn": + return TurnMessage.parse_obj(message) + elif msg_type == "Termination": + return TerminationMessage.parse_obj(message) + else: + raise ValueError(f"Unknown message type: {msg_type}") + + async def _handle_message(self, message: Dict[str, Any]): + """Handle AssemblyAI WebSocket messages.""" + try: + parsed_message = self._parse_message(message) + + if isinstance(parsed_message, BeginMessage): + logger.debug( + f"Session Begin: {parsed_message.id} (expires at {parsed_message.expires_at})" + ) + elif isinstance(parsed_message, TurnMessage): + await self._handle_transcription(parsed_message) + elif isinstance(parsed_message, TerminationMessage): + await self._handle_termination(parsed_message) + except Exception as e: + logger.error(f"Error handling message: {e}") + + async def _handle_termination(self, message: TerminationMessage): + """Handle termination message.""" + self._received_termination = True + self._termination_event.set() + + logger.info( + f"Session Terminated: Audio Duration={message.audio_duration_seconds}s, " + f"Session Duration={message.session_duration_seconds}s" + ) + await self.push_frame(EndFrame()) + + async def _handle_transcription(self, message: TurnMessage): + """Handle transcription results.""" + if not message.transcript: + return + await self.stop_ttfb_metrics() + if message.end_of_turn: + await self.push_frame( + TranscriptionFrame( + message.transcript, + "", # participant + time_now_iso8601(), + self._language, + ) + ) + await self.stop_processing_metrics() + else: + await self.push_frame( + InterimTranscriptionFrame( + message.transcript, + "", # participant + time_now_iso8601(), + self._language, + ) + ) From 5479a55b2c4bb77630ab6f84575b086d69efb28d Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Mon, 26 May 2025 09:08:56 +0800 Subject: [PATCH 03/14] Add websockets dependency to assemblyai extra --- pyproject.toml | 2 +- src/pipecat/services/assemblyai/stt.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 78bd78773..df0cffc23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ Website = "https://pipecat.ai" [project.optional-dependencies] anthropic = [ "anthropic~=0.49.0" ] -assemblyai = [ "assemblyai~=0.37.0" ] +assemblyai = [ "websockets~=13.1" ] aws = [ "boto3~=1.37.16", "websockets~=13.1" ] aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.0.2" ] azure = [ "azure-cognitiveservices-speech~=1.42.0"] diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 3dfd9a3f3..2e8ad3058 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -38,7 +38,7 @@ try: import websockets except ModuleNotFoundError as e: logger.error(f"Exception: {e}") - logger.error("In order to use AssemblyAI, you need to `pip install websockets`.") + logger.error('In order to use AssemblyAI, you need to `pip install "pipecat-ai[assemblyai]"`.') raise Exception(f"Missing module: {e}") From 428cee75c53a0b53a69aa4bfc3bb140206ca051e Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Mon, 26 May 2025 09:10:55 +0800 Subject: [PATCH 04/14] Add User-Agent header to AssemblyAI websocket connection --- src/pipecat/services/assemblyai/stt.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 2e8ad3058..bdc2bafd1 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -11,6 +11,7 @@ from urllib.parse import urlencode from loguru import logger +from pipecat import __version__ as pipecat_version from pipecat.frames.frames import ( CancelFrame, EndFrame, @@ -108,9 +109,13 @@ class AssemblyAISTTService(STTService): async def _connect(self): try: ws_url = self._build_ws_url() + headers = { + "Authorization": self._api_key, + "User-Agent": f"AssemblyAI/1.0 (integration=Pipecat/{pipecat_version})", + } self._websocket = await websockets.connect( ws_url, - extra_headers={"Authorization": self._api_key}, + extra_headers=headers, ) self._connected = True self._receive_task = asyncio.create_task(self._receive_task_handler()) From f1d7eb8565714bef57e521c53419c6a3623c97a8 Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Tue, 27 May 2025 21:29:50 +0800 Subject: [PATCH 05/14] final touches --- src/pipecat/services/assemblyai/stt.py | 49 ++++++++++++++++++-------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 9b5c9b9de..7943aa360 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -51,15 +51,17 @@ class AssemblyAISTTService(STTService): language: Language = Language.EN, # AssemblyAI only supports English api_endpoint_base_url: str = "wss://streaming.assemblyai.com/v3/ws", connection_params: AssemblyAIConnectionParams = AssemblyAIConnectionParams(), + vad_force_turn_endpoint: bool = False, **kwargs, ): self._api_key = api_key self._language = language self._api_endpoint_base_url = api_endpoint_base_url self._connection_params = connection_params + self._vad_force_turn_endpoint = vad_force_turn_endpoint super().__init__(sample_rate=self._connection_params.sample_rate, **kwargs) - + self._websocket = None self._termination_event = asyncio.Event() self._received_termination = False @@ -67,11 +69,16 @@ class AssemblyAISTTService(STTService): self._receive_task = None + self._audio_buffer = bytearray() + self._chunk_size_ms = 50 + self._chunk_size_bytes = 0 + def can_generate_metrics(self) -> bool: return True async def start(self, frame: StartFrame): await super().start(frame) + self._chunk_size_bytes = int(self._chunk_size_ms * self._sample_rate * 2 / 1000) await self._connect() async def stop(self, frame: EndFrame): @@ -83,7 +90,13 @@ class AssemblyAISTTService(STTService): await self._disconnect() async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: - await self._websocket.send(audio) + self._audio_buffer.extend(audio) + + while len(self._audio_buffer) >= self._chunk_size_bytes: + chunk = bytes(self._audio_buffer[: self._chunk_size_bytes]) + self._audio_buffer = self._audio_buffer[self._chunk_size_bytes :] + await self._websocket.send(chunk) + yield Frame() async def process_frame(self, frame: Frame, direction: FrameDirection): @@ -91,7 +104,8 @@ class AssemblyAISTTService(STTService): if isinstance(frame, UserStartedSpeakingFrame): await self.start_ttfb_metrics() elif isinstance(frame, UserStoppedSpeakingFrame): - # TODO: if the user opts to use VAD, we should send a ForceEndpoint message + if self._vad_force_turn_endpoint: + await self._websocket.send(json.dumps({"type": "ForceEndpoint"})) await self.start_processing_metrics() def _build_ws_url(self) -> str: @@ -133,6 +147,10 @@ class AssemblyAISTTService(STTService): self._termination_event.clear() self._received_termination = False + if len(self._audio_buffer) > 0: + await self._websocket.send(bytes(self._audio_buffer)) + self._audio_buffer.clear() + try: await self._websocket.send(json.dumps({"type": "Terminate"})) @@ -147,21 +165,22 @@ class AssemblyAISTTService(STTService): except Exception as e: logger.warning(f"Error during termination handshake: {e}") - finally: - self._connected = False - - try: - if not self._websocket.closed: - await self._websocket.close() - except Exception as e: - logger.warning(f"Error closing WebSocket: {e}") - - if self._receive_task and not self._receive_task.done(): + if self._receive_task: self._receive_task.cancel() try: await self._receive_task - except (asyncio.CancelledError, Exception) as e: - logger.debug(f"Receive task cancelled: {e}") + except asyncio.CancelledError: + pass + + await self._websocket.close() + + except Exception as e: + logger.error(f"Error during disconnect: {e}") + + finally: + self._websocket = None + self._connected = False + self._receive_task = None async def _receive_task_handler(self): """Handle incoming WebSocket messages.""" From 3ffe8b31551e619cb7257ade890d461ea8a240f4 Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 30 May 2025 09:29:51 +0800 Subject: [PATCH 06/14] remove parse_obj --- src/pipecat/services/assemblyai/stt.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 7943aa360..89f24bacd 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -204,11 +204,11 @@ class AssemblyAISTTService(STTService): msg_type = message.get("type") if msg_type == "Begin": - return BeginMessage.parse_obj(message) + return BeginMessage.model_validate(message) elif msg_type == "Turn": - return TurnMessage.parse_obj(message) + return TurnMessage.model_validate(message) elif msg_type == "Termination": - return TerminationMessage.parse_obj(message) + return TerminationMessage.model_validate(message) else: raise ValueError(f"Unknown message type: {msg_type}") From 096e854d503fddd50da181402a41f8c827b60ac3 Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 30 May 2025 09:31:20 +0800 Subject: [PATCH 07/14] remove .dict() --- src/pipecat/services/assemblyai/stt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 89f24bacd..ed4d9d55b 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -112,7 +112,7 @@ class AssemblyAISTTService(STTService): """Build WebSocket URL with query parameters using urllib.parse.urlencode.""" params = { k: str(v).lower() if isinstance(v, bool) else v - for k, v in self._connection_params.dict().items() + for k, v in self._connection_params.model_dump().items() if v is not None } if params: From 7f2e848a5c4a754805d1945521821e545175175a Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 30 May 2025 09:40:46 +0800 Subject: [PATCH 08/14] use FrameProcessor class methods --- src/pipecat/services/assemblyai/stt.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index ed4d9d55b..8f6569ffe 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -132,7 +132,7 @@ class AssemblyAISTTService(STTService): extra_headers=headers, ) self._connected = True - self._receive_task = asyncio.create_task(self._receive_task_handler()) + self._receive_task = self.create_task(self._receive_task_handler()) except Exception as e: logger.error(f"Failed to connect to AssemblyAI: {e}") self._connected = False @@ -166,11 +166,7 @@ class AssemblyAISTTService(STTService): logger.warning(f"Error during termination handshake: {e}") if self._receive_task: - self._receive_task.cancel() - try: - await self._receive_task - except asyncio.CancelledError: - pass + await self.cancel_task(self._receive_task) await self._websocket.close() From db838634e73fb055216ac75cef20412c0817f7fe Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 30 May 2025 10:00:31 +0800 Subject: [PATCH 09/14] fix: double finals bug --- src/pipecat/services/assemblyai/stt.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 8f6569ffe..2081e6347 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -240,7 +240,9 @@ class AssemblyAISTTService(STTService): if not message.transcript: return await self.stop_ttfb_metrics() - if message.end_of_turn: + if message.end_of_turn and ( + not self._connection_params.formatted_finals or message.turn_is_formatted + ): await self.push_frame( TranscriptionFrame( message.transcript, From b5eac8dfed3f9f85ff8100eafc03ec2c1c8bb9a8 Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 30 May 2025 10:39:07 +0800 Subject: [PATCH 10/14] add message to TranscriptionFrame result --- src/pipecat/services/assemblyai/stt.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 2081e6347..bf036eb62 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -249,6 +249,7 @@ class AssemblyAISTTService(STTService): "", # participant time_now_iso8601(), self._language, + message, ) ) await self.stop_processing_metrics() @@ -259,5 +260,6 @@ class AssemblyAISTTService(STTService): "", # participant time_now_iso8601(), self._language, + message, ) ) From 8400539acfd09d53823103ba54af40ea94013843 Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 30 May 2025 10:40:01 +0800 Subject: [PATCH 11/14] set formatted_finals true by default --- src/pipecat/services/assemblyai/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/assemblyai/models.py b/src/pipecat/services/assemblyai/models.py index 6a0179c3d..58b69fdf5 100644 --- a/src/pipecat/services/assemblyai/models.py +++ b/src/pipecat/services/assemblyai/models.py @@ -54,7 +54,7 @@ AnyMessage = BeginMessage | TurnMessage | TerminationMessage class AssemblyAIConnectionParams(BaseModel): sample_rate: int = 16000 encoding: Literal["pcm_s16le", "pcm_mulaw"] = "pcm_s16le" - formatted_finals: bool = False + formatted_finals: bool = True word_finalization_max_wait_time: Optional[int] = None end_of_turn_confidence_threshold: Optional[float] = None min_end_of_turn_silence_when_confident: Optional[int] = None From e5da3f6e686bc7e81a7b1c7c4522283db5c9e9b7 Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 30 May 2025 10:55:19 +0800 Subject: [PATCH 12/14] add tracing --- src/pipecat/services/assemblyai/stt.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index bf036eb62..7e9fd0853 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -26,6 +26,7 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.services.stt_service import STTService from pipecat.transcriptions.language import Language from pipecat.utils.time import time_now_iso8601 +from pipecat.utils.tracing.service_decorators import traced_stt from .models import ( AssemblyAIConnectionParams, @@ -108,6 +109,11 @@ class AssemblyAISTTService(STTService): await self._websocket.send(json.dumps({"type": "ForceEndpoint"})) await self.start_processing_metrics() + @traced_stt + async def _trace_transcription(self, transcript: str, is_final: bool, language: Language): + """Record transcription event for tracing.""" + pass + def _build_ws_url(self) -> str: """Build WebSocket URL with query parameters using urllib.parse.urlencode.""" params = { @@ -252,6 +258,7 @@ class AssemblyAISTTService(STTService): message, ) ) + await self._trace_transcription(message.transcript, True, self._language) await self.stop_processing_metrics() else: await self.push_frame( From b3652e65271516beba89000b06bc119aa9ba6313 Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 30 May 2025 17:07:54 +0800 Subject: [PATCH 13/14] set vad_force_turn_endpoint to True by default --- src/pipecat/services/assemblyai/stt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index 7e9fd0853..c7e1d9e48 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -52,7 +52,7 @@ class AssemblyAISTTService(STTService): language: Language = Language.EN, # AssemblyAI only supports English api_endpoint_base_url: str = "wss://streaming.assemblyai.com/v3/ws", connection_params: AssemblyAIConnectionParams = AssemblyAIConnectionParams(), - vad_force_turn_endpoint: bool = False, + vad_force_turn_endpoint: bool = True, **kwargs, ): self._api_key = api_key From 321ea27c34fd399c4d6744fe8c7174df11a23a38 Mon Sep 17 00:00:00 2001 From: Martin Schweiger Date: Fri, 30 May 2025 17:15:58 +0800 Subject: [PATCH 14/14] changelog entry --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index de1a9c1f4..c971a1c23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Updated AssemblyAI STT service to support their latest streaming + speech-to-text model with improved transcription latency and endpointing. + - You can now access STT service results through the new `TranscriptionFrame.result` and `InterimTranscriptionFrame.result` field. This is useful in case you use some specific settings for the STT and you want to