From 803b3f2cc4171c2402045a1d03104f1607f50258 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 29 May 2025 00:22:11 -0400 Subject: [PATCH 1/2] fix: update websocket_client to use FrameProcessorSetup --- CHANGELOG.md | 3 +++ .../transports/network/websocket_client.py | 15 +++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index abf343513..2780c15da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 messages array. This workaround includes a no_op factory function call is used to satisfy the requirement. +- Fixed `WebsocketClientTransport` to use `FrameProcessorSetup.task_manager` + instead of `StartFrame.task_manager`. + ## [0.0.68] - 2025-05-28 ### Added diff --git a/src/pipecat/transports/network/websocket_client.py b/src/pipecat/transports/network/websocket_client.py index 23a291784..0d0f9875b 100644 --- a/src/pipecat/transports/network/websocket_client.py +++ b/src/pipecat/transports/network/websocket_client.py @@ -24,6 +24,7 @@ from pipecat.frames.frames import ( TransportMessageFrame, TransportMessageUrgentFrame, ) +from pipecat.processors.frame_processor import FrameProcessorSetup from pipecat.serializers.base_serializer import FrameSerializer from pipecat.serializers.protobuf import ProtobufFrameSerializer from pipecat.transports.base_input import BaseInputTransport @@ -68,10 +69,10 @@ class WebsocketClientSession: ) return self._task_manager - async def setup(self, frame: StartFrame): + async def setup(self, task_manager: BaseTaskManager): self._leave_counter += 1 if not self._task_manager: - self._task_manager = frame.task_manager + self._task_manager = task_manager async def connect(self): if self._websocket: @@ -131,11 +132,14 @@ class WebsocketClientInputTransport(BaseInputTransport): self._session = session self._params = params + async def setup(self, setup: FrameProcessorSetup): + await super().setup(setup) + await self._session.setup(setup.task_manager) + async def start(self, frame: StartFrame): await super().start(frame) if self._params.serializer: await self._params.serializer.setup(frame) - await self._session.setup(frame) await self._session.connect() await self.set_transport_ready(frame) @@ -184,12 +188,15 @@ class WebsocketClientOutputTransport(BaseOutputTransport): self._send_interval = 0 self._next_send_time = 0 + async def setup(self, setup: FrameProcessorSetup): + await super().setup(setup) + await self._session.setup(setup.task_manager) + async def start(self, frame: StartFrame): await super().start(frame) self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2 if self._params.serializer: await self._params.serializer.setup(frame) - await self._session.setup(frame) await self._session.connect() await self.set_transport_ready(frame) From ea919704991b02ccb1d14680344ff87ee5b1a3fd Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 29 May 2025 00:22:41 -0400 Subject: [PATCH 2/2] Update the twilio-chatbot client to work with the updated server, which requires call_sid --- examples/twilio-chatbot/client.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/examples/twilio-chatbot/client.py b/examples/twilio-chatbot/client.py index f52ceaae8..33592da0a 100644 --- a/examples/twilio-chatbot/client.py +++ b/examples/twilio-chatbot/client.py @@ -84,6 +84,7 @@ async def run_client(client_name: str, server_url: str, duration_secs: int): stream_url = get_stream_url_from_twiml(twiml) stream_sid = str(uuid4()) + call_sid = str(uuid4()) transport = WebsocketClientTransport( uri=stream_url, @@ -91,18 +92,14 @@ async def run_client(client_name: str, server_url: str, duration_secs: int): audio_in_enabled=True, audio_out_enabled=True, add_wav_header=False, - serializer=TwilioFrameSerializer(stream_sid), - vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=1.5)), + serializer=TwilioFrameSerializer(stream_sid=stream_sid, call_sid=call_sid), + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=1.0)), ), ) llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - # We let the audio passthrough so we can record the conversation. - stt = DeepgramSTTService( - api_key=os.getenv("DEEPGRAM_API_KEY"), - audio_passthrough=True, - ) + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), @@ -157,7 +154,12 @@ async def run_client(client_name: str, server_url: str, duration_secs: int): await transport.output().send_message(message) message = TransportMessageUrgentFrame( - message={"event": "start", "streamSid": stream_sid, "start": {"streamSid": stream_sid}} + message={ + "event": "start", + "streamSid": stream_sid, + "callSid": call_sid, + "start": {"streamSid": stream_sid, "callSid": call_sid}, + } ) await transport.output().send_message(message) @@ -167,6 +169,7 @@ async def run_client(client_name: str, server_url: str, duration_secs: int): async def end_call(): await asyncio.sleep(duration_secs) + logger.info(f"Client {client_name} finished after {duration_secs} seconds.") await task.queue_frame(EndFrame()) runner = PipelineRunner()