diff --git a/src/dailyai/pipeline/frames.py b/src/dailyai/pipeline/frames.py index d0d279bd9..09f206c87 100644 --- a/src/dailyai/pipeline/frames.py +++ b/src/dailyai/pipeline/frames.py @@ -95,9 +95,19 @@ class OpenAILLMContextFrame(Frame): context: OpenAILLMContext -class AppMessageQueueFrame(Frame): +@dataclass() +class ReceivedAppMessageFrame(Frame): message: Any - participantId: str + sender: str + + def __str__(self): + return f"ReceivedAppMessageFrame: sender: {self.sender}, message: {self.message}" + + +@dataclass() +class SendAppMessageFrame(Frame): + message: Any + participantId: str | None class UserStartedSpeakingFrame(Frame): diff --git a/src/dailyai/services/base_transport_service.py b/src/dailyai/services/base_transport_service.py index a24a5dc48..316ba7be2 100644 --- a/src/dailyai/services/base_transport_service.py +++ b/src/dailyai/services/base_transport_service.py @@ -8,11 +8,13 @@ import torch import queue import threading import time -from typing import AsyncGenerator +from typing import Any, AsyncGenerator from enum import Enum from dailyai.pipeline.frame_processor import FrameProcessor from dailyai.pipeline.frames import ( + ReceivedAppMessageFrame, + SendAppMessageFrame, AudioFrame, EndFrame, ImageFrame, @@ -317,9 +319,10 @@ class BaseTransportService: self._vad_state == VADState.STARTING and self._vad_starting_count >= self._vad_start_frames ): - asyncio.run_coroutine_threadsafe( - self.receive_queue.put(UserStartedSpeakingFrame()), self._loop - ) + if self._loop: + asyncio.run_coroutine_threadsafe( + self.receive_queue.put(UserStartedSpeakingFrame()), self._loop + ) # self.interrupt() self._vad_state = VADState.SPEAKING self._vad_starting_count = 0 @@ -327,9 +330,10 @@ class BaseTransportService: self._vad_state == VADState.STOPPING and self._vad_stopping_count >= self._vad_stop_frames ): - asyncio.run_coroutine_threadsafe( - self.receive_queue.put(UserStoppedSpeakingFrame()), self._loop - ) + if self._loop: + asyncio.run_coroutine_threadsafe( + self.receive_queue.put(UserStoppedSpeakingFrame()), self._loop + ) self._vad_state = VADState.QUIET self._vad_stopping_count = 0 @@ -375,6 +379,10 @@ class BaseTransportService: def _set_images(self, images: list[bytes], start_frame=0): self._images = itertools.cycle(images) + def send_app_message(self, message: Any, participantId:str|None): + """ Child classes should override this to send a custom message to the room. """ + pass + def _run_camera(self): try: while not self._stop_threads.is_set(): @@ -440,6 +448,8 @@ class BaseTransportService: self._set_image(frame.image) elif isinstance(frame, SpriteFrame): self._set_images(frame.images) + elif isinstance(frame, SendAppMessageFrame): + self.send_app_message(frame.message, frame.participantId) elif len(b): self.write_frame_to_mic(bytes(b)) b = bytearray() diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index 16c8519ba..aacb74a3c 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -6,8 +6,10 @@ import threading import types from functools import partial +from typing import Any from dailyai.pipeline.frames import ( + ReceivedAppMessageFrame, TranscriptionQueueFrame, ) @@ -124,6 +126,9 @@ class DailyTransportService(BaseTransportService, EventHandler): def write_frame_to_mic(self, frame: bytes): self.mic.write_frames(frame) + def send_app_message(self, message: Any, participantId: str | None): + self.client.send_app_message(message, participantId) + def read_audio_frames(self, desired_frame_count): bytes = self._speaker.read_frames(desired_frame_count) return bytes @@ -219,7 +224,7 @@ class DailyTransportService(BaseTransportService, EventHandler): pass def call_joined(self, join_data, client_error): - #self._logger.info(f"Call_joined: {join_data}, {client_error}") + # self._logger.info(f"Call_joined: {join_data}, {client_error}") pass def dialout(self, number): @@ -243,8 +248,13 @@ class DailyTransportService(BaseTransportService, EventHandler): if len(self.client.participants()) < self._min_others_count + 1: self._stop_threads.set() - def on_app_message(self, message, sender): - pass + def on_app_message(self, message:Any, sender:str): + if self._loop: + frame = ReceivedAppMessageFrame(message, sender) + print(frame) + asyncio.run_coroutine_threadsafe( + self.receive_queue.put(frame), self._loop + ) def on_transcription_message(self, message: dict): if self._loop: