From d8947c68a9144d77c5aec1b2cedc2bcfd1cf86e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 20 May 2026 15:57:29 -0700 Subject: [PATCH] Rename BaseTask.send_message to send_bus_message Mirrors on_bus_message and makes it explicit that the call goes out on the task bus, not on a transport (transports have their own send_message for client/peer messaging). --- src/pipecat/pipeline/base_task.py | 38 ++++++++++----------- src/pipecat/tasks/proxy/websocket/client.py | 6 ++-- src/pipecat/tasks/proxy/websocket/server.py | 4 +-- tests/test_websocket_proxy.py | 16 ++++----- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/pipecat/pipeline/base_task.py b/src/pipecat/pipeline/base_task.py index 046e80ca6..4f22b73a6 100644 --- a/src/pipecat/pipeline/base_task.py +++ b/src/pipecat/pipeline/base_task.py @@ -360,11 +360,11 @@ class BaseTask(BaseObject, BusSubscriber): Args: reason: Optional human-readable reason for ending. """ - await self.send_message(BusEndMessage(source=self.name, reason=reason)) + await self.send_bus_message(BusEndMessage(source=self.name, reason=reason)) async def cancel(self) -> None: """Request an immediate cancellation of all tasks.""" - await self.send_message(BusCancelMessage(source=self.name)) + await self.send_bus_message(BusCancelMessage(source=self.name)) async def wait(self) -> None: """Wait for this task to finish.""" @@ -520,7 +520,7 @@ class BaseTask(BaseObject, BusSubscriber): """ pass - async def send_message(self, message: BusMessage) -> None: + async def send_bus_message(self, message: BusMessage) -> None: """Send a message on the bus. Args: @@ -539,9 +539,9 @@ class BaseTask(BaseObject, BusSubscriber): error: Description of the error. """ if self._parent: - await self.send_message(BusTaskLocalErrorMessage(source=self.name, error=error)) + await self.send_bus_message(BusTaskLocalErrorMessage(source=self.name, error=error)) else: - await self.send_message(BusTaskErrorMessage(source=self.name, error=error)) + await self.send_bus_message(BusTaskErrorMessage(source=self.name, error=error)) async def add_task(self, task: "BaseTask") -> None: """Register a child task under this parent. @@ -558,7 +558,7 @@ class BaseTask(BaseObject, BusSubscriber): return task._parent = self.name self._children.append(task) - await self.send_message(BusAddTaskMessage(source=self.name, task=task)) + await self.send_bus_message(BusAddTaskMessage(source=self.name, task=task)) async def activate_task( self, @@ -581,7 +581,7 @@ class BaseTask(BaseObject, BusSubscriber): """ if self._active and deactivate_self: await self.deactivate_task(self.name) - await self.send_message( + await self.send_bus_message( BusActivateTaskMessage( source=self.name, target=task_name, args=args.to_dict() if args else None ) @@ -595,7 +595,7 @@ class BaseTask(BaseObject, BusSubscriber): Args: task_name: The name of the task to deactivate. """ - await self.send_message(BusDeactivateTaskMessage(source=self.name, target=task_name)) + await self.send_bus_message(BusDeactivateTaskMessage(source=self.name, target=task_name)) async def watch_task(self, task_name: str) -> None: """Request notification when a task registers. @@ -798,7 +798,7 @@ class BaseTask(BaseObject, BusSubscriber): if group.timeout_task: await self.cancel_task(group.timeout_task) for task_name in group.task_names: - await self.send_message( + await self.send_bus_message( BusJobCancelMessage( source=self.name, target=task_name, job_id=job_id, reason=reason ) @@ -812,7 +812,7 @@ class BaseTask(BaseObject, BusSubscriber): job_id: The job identifier. task_name: The name of the worker to request an update from. """ - await self.send_message( + await self.send_bus_message( BusJobUpdateRequestMessage(source=self.name, target=task_name, job_id=job_id) ) @@ -886,7 +886,7 @@ class BaseTask(BaseObject, BusSubscriber): if request is None: raise RuntimeError(f"Task '{self}': no active job '{job_id}' to respond to") msg_class = BusJobResponseUrgentMessage if urgent else BusJobResponseMessage - await self.send_message( + await self.send_bus_message( msg_class( source=self.name, target=request.source, @@ -915,7 +915,7 @@ class BaseTask(BaseObject, BusSubscriber): if request is None: raise RuntimeError(f"Task '{self}': no active job '{job_id}' to update") msg_class = BusJobUpdateUrgentMessage if urgent else BusJobUpdateMessage - await self.send_message( + await self.send_bus_message( msg_class( source=self.name, target=request.source, @@ -937,7 +937,7 @@ class BaseTask(BaseObject, BusSubscriber): request = self._active_jobs.get(job_id) if request is None: raise RuntimeError(f"Task '{self}': no active job '{job_id}' to stream") - await self.send_message( + await self.send_bus_message( BusJobStreamStartMessage( source=self.name, target=request.source, @@ -959,7 +959,7 @@ class BaseTask(BaseObject, BusSubscriber): request = self._active_jobs.get(job_id) if request is None: raise RuntimeError(f"Task '{self}': no active job '{job_id}' to stream") - await self.send_message( + await self.send_bus_message( BusJobStreamDataMessage( source=self.name, target=request.source, @@ -981,7 +981,7 @@ class BaseTask(BaseObject, BusSubscriber): request = self._active_jobs.get(job_id) if request is None: raise RuntimeError(f"Task '{self}': no active job '{job_id}' to stream") - await self.send_message( + await self.send_bus_message( BusJobStreamEndMessage( source=self.name, target=request.source, @@ -1001,7 +1001,7 @@ class BaseTask(BaseObject, BusSubscriber): # fires watchers synchronously, which may send additional # messages (e.g. ActivateTask). Sending the ready message # first preserves correct chronological order for observers. - await self.send_message( + await self.send_bus_message( BusTaskReadyMessage( source=self.name, runner=self._registry.runner_name, @@ -1111,7 +1111,7 @@ class BaseTask(BaseObject, BusSubscriber): async def _propagate_end_to_children(self, message: BusEndTaskMessage) -> None: """Forward a ``BusEndTaskMessage`` to each child and wait for them.""" for child in self._children: - await self.send_message( + await self.send_bus_message( BusEndTaskMessage(source=self.name, target=child.name, reason=message.reason) ) await asyncio.gather(*(child.wait() for child in self._children)) @@ -1119,7 +1119,7 @@ class BaseTask(BaseObject, BusSubscriber): async def _propagate_cancel_to_children(self, message: BusCancelTaskMessage) -> None: """Forward a ``BusCancelTaskMessage`` to each child.""" for child in self._children: - await self.send_message( + await self.send_bus_message( BusCancelTaskMessage(source=self.name, target=child.name, reason=message.reason) ) @@ -1298,7 +1298,7 @@ class BaseTask(BaseObject, BusSubscriber): job_name: str | None = None, payload: dict | None = None, ) -> None: - await self.send_message( + await self.send_bus_message( BusJobRequestMessage( source=self.name, target=task_name, diff --git a/src/pipecat/tasks/proxy/websocket/client.py b/src/pipecat/tasks/proxy/websocket/client.py index f9581758c..bf63dd9d3 100644 --- a/src/pipecat/tasks/proxy/websocket/client.py +++ b/src/pipecat/tasks/proxy/websocket/client.py @@ -188,13 +188,13 @@ class WebSocketProxyClientTask(BaseTask): logger.trace( f"Task '{self}': received registry from remote: {message.tasks}" ) - await self.send_message(message) + await self.send_bus_message(message) continue # Accept additional message types (e.g. BusFrameMessage). if self._forward_messages and isinstance(message, self._forward_messages): logger.trace(f"Task '{self}': received {message} from remote") - await self.send_message(message) + await self.send_bus_message(message) continue # Only accept other messages targeted at the local task. @@ -206,7 +206,7 @@ class WebSocketProxyClientTask(BaseTask): continue logger.trace(f"Task '{self}': received {message} from remote") - await self.send_message(message) + await self.send_bus_message(message) except Exception: logger.exception(f"Task '{self}': failed to deserialize remote message") except websockets.exceptions.ConnectionClosed: diff --git a/src/pipecat/tasks/proxy/websocket/server.py b/src/pipecat/tasks/proxy/websocket/server.py index b14b91672..3708aa03c 100644 --- a/src/pipecat/tasks/proxy/websocket/server.py +++ b/src/pipecat/tasks/proxy/websocket/server.py @@ -197,7 +197,7 @@ class WebSocketProxyServerTask(BaseTask): # Accept additional message types (e.g. BusFrameMessage). if self._forward_messages and isinstance(message, self._forward_messages): logger.trace(f"Task '{self}': received {message} from client") - await self.send_message(message) + await self.send_bus_message(message) continue # Only accept other messages targeted at the local task. @@ -209,7 +209,7 @@ class WebSocketProxyServerTask(BaseTask): continue logger.trace(f"Task '{self}': received {message} from client") - await self.send_message(message) + await self.send_bus_message(message) except Exception: logger.exception(f"Task '{self}': failed to deserialize client message") except WebSocketDisconnect: diff --git a/tests/test_websocket_proxy.py b/tests/test_websocket_proxy.py index 4bbd82f53..b0f630e10 100644 --- a/tests/test_websocket_proxy.py +++ b/tests/test_websocket_proxy.py @@ -154,13 +154,13 @@ class TestWebSocketProxyClientTask(unittest.IsolatedAsyncioTestCase): task = await self._create_client(fake_ws) sent_to_bus = [] - original_send = task.send_message + original_send = task.send_bus_message async def capture_send(message): sent_to_bus.append(message) await original_send(message) - task.send_message = capture_send + task.send_bus_message = capture_send msg = BusDataMessage(source="worker", target="voice") fake_ws.inject(self.serializer.serialize(msg)) @@ -183,13 +183,13 @@ class TestWebSocketProxyClientTask(unittest.IsolatedAsyncioTestCase): task = await self._create_client(fake_ws) sent_to_bus = [] - original_send = task.send_message + original_send = task.send_bus_message async def capture_send(message): sent_to_bus.append(message) await original_send(message) - task.send_message = capture_send + task.send_bus_message = capture_send msg = BusDataMessage(source="worker", target="other_task") fake_ws.inject(self.serializer.serialize(msg)) @@ -264,13 +264,13 @@ class TestWebSocketProxyServerTask(unittest.IsolatedAsyncioTestCase): task = await self._create_server(fake_ws) sent_to_bus = [] - original_send = task.send_message + original_send = task.send_bus_message async def capture_send(message): sent_to_bus.append(message) await original_send(message) - task.send_message = capture_send + task.send_bus_message = capture_send msg = BusDataMessage(source="voice", target="worker") fake_ws.inject(self.serializer.serialize(msg)) @@ -292,13 +292,13 @@ class TestWebSocketProxyServerTask(unittest.IsolatedAsyncioTestCase): task = await self._create_server(fake_ws) sent_to_bus = [] - original_send = task.send_message + original_send = task.send_bus_message async def capture_send(message): sent_to_bus.append(message) await original_send(message) - task.send_message = capture_send + task.send_bus_message = capture_send msg = BusDataMessage(source="voice", target="other_task") fake_ws.inject(self.serializer.serialize(msg))