Merge pull request #1917 from pipecat-ai/mb/fix-twilio-chatbot-client

fix: update websocket_client to use FrameProcessorSetup.task_manager
This commit is contained in:
Mark Backman
2025-05-29 14:05:27 -04:00
committed by GitHub
3 changed files with 25 additions and 12 deletions

View File

@@ -32,6 +32,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
messages array. This workaround includes a no_op factory function call is
used to satisfy the requirement.
- Fixed `WebsocketClientTransport` to use `FrameProcessorSetup.task_manager`
instead of `StartFrame.task_manager`.
## [0.0.68] - 2025-05-28
### Added

View File

@@ -84,6 +84,7 @@ async def run_client(client_name: str, server_url: str, duration_secs: int):
stream_url = get_stream_url_from_twiml(twiml)
stream_sid = str(uuid4())
call_sid = str(uuid4())
transport = WebsocketClientTransport(
uri=stream_url,
@@ -91,18 +92,14 @@ async def run_client(client_name: str, server_url: str, duration_secs: int):
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
serializer=TwilioFrameSerializer(stream_sid),
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=1.5)),
serializer=TwilioFrameSerializer(stream_sid=stream_sid, call_sid=call_sid),
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=1.0)),
),
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
# We let the audio passthrough so we can record the conversation.
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
audio_passthrough=True,
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
@@ -157,7 +154,12 @@ async def run_client(client_name: str, server_url: str, duration_secs: int):
await transport.output().send_message(message)
message = TransportMessageUrgentFrame(
message={"event": "start", "streamSid": stream_sid, "start": {"streamSid": stream_sid}}
message={
"event": "start",
"streamSid": stream_sid,
"callSid": call_sid,
"start": {"streamSid": stream_sid, "callSid": call_sid},
}
)
await transport.output().send_message(message)
@@ -167,6 +169,7 @@ async def run_client(client_name: str, server_url: str, duration_secs: int):
async def end_call():
await asyncio.sleep(duration_secs)
logger.info(f"Client {client_name} finished after {duration_secs} seconds.")
await task.queue_frame(EndFrame())
runner = PipelineRunner()

View File

@@ -24,6 +24,7 @@ from pipecat.frames.frames import (
TransportMessageFrame,
TransportMessageUrgentFrame,
)
from pipecat.processors.frame_processor import FrameProcessorSetup
from pipecat.serializers.base_serializer import FrameSerializer
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.transports.base_input import BaseInputTransport
@@ -68,10 +69,10 @@ class WebsocketClientSession:
)
return self._task_manager
async def setup(self, frame: StartFrame):
async def setup(self, task_manager: BaseTaskManager):
self._leave_counter += 1
if not self._task_manager:
self._task_manager = frame.task_manager
self._task_manager = task_manager
async def connect(self):
if self._websocket:
@@ -131,11 +132,14 @@ class WebsocketClientInputTransport(BaseInputTransport):
self._session = session
self._params = params
async def setup(self, setup: FrameProcessorSetup):
await super().setup(setup)
await self._session.setup(setup.task_manager)
async def start(self, frame: StartFrame):
await super().start(frame)
if self._params.serializer:
await self._params.serializer.setup(frame)
await self._session.setup(frame)
await self._session.connect()
await self.set_transport_ready(frame)
@@ -184,12 +188,15 @@ class WebsocketClientOutputTransport(BaseOutputTransport):
self._send_interval = 0
self._next_send_time = 0
async def setup(self, setup: FrameProcessorSetup):
await super().setup(setup)
await self._session.setup(setup.task_manager)
async def start(self, frame: StartFrame):
await super().start(frame)
self._send_interval = (self.audio_chunk_size / self.sample_rate) / 2
if self._params.serializer:
await self._params.serializer.setup(frame)
await self._session.setup(frame)
await self._session.connect()
await self.set_transport_ready(frame)