Support for app messages

This commit is contained in:
Moishe Lettvin
2024-03-18 10:08:41 -04:00
parent c6dfcb6f7a
commit 8e61fe8e36
3 changed files with 42 additions and 12 deletions

View File

@@ -95,9 +95,19 @@ class OpenAILLMContextFrame(Frame):
context: OpenAILLMContext
class AppMessageQueueFrame(Frame):
@dataclass()
class ReceivedAppMessageFrame(Frame):
message: Any
participantId: str
sender: str
def __str__(self):
return f"ReceivedAppMessageFrame: sender: {self.sender}, message: {self.message}"
@dataclass()
class SendAppMessageFrame(Frame):
message: Any
participantId: str | None
class UserStartedSpeakingFrame(Frame):

View File

@@ -8,11 +8,13 @@ import torch
import queue
import threading
import time
from typing import AsyncGenerator
from typing import Any, AsyncGenerator
from enum import Enum
from dailyai.pipeline.frame_processor import FrameProcessor
from dailyai.pipeline.frames import (
ReceivedAppMessageFrame,
SendAppMessageFrame,
AudioFrame,
EndFrame,
ImageFrame,
@@ -317,9 +319,10 @@ class BaseTransportService:
self._vad_state == VADState.STARTING
and self._vad_starting_count >= self._vad_start_frames
):
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(UserStartedSpeakingFrame()), self._loop
)
if self._loop:
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(UserStartedSpeakingFrame()), self._loop
)
# self.interrupt()
self._vad_state = VADState.SPEAKING
self._vad_starting_count = 0
@@ -327,9 +330,10 @@ class BaseTransportService:
self._vad_state == VADState.STOPPING
and self._vad_stopping_count >= self._vad_stop_frames
):
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(UserStoppedSpeakingFrame()), self._loop
)
if self._loop:
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(UserStoppedSpeakingFrame()), self._loop
)
self._vad_state = VADState.QUIET
self._vad_stopping_count = 0
@@ -375,6 +379,10 @@ class BaseTransportService:
def _set_images(self, images: list[bytes], start_frame=0):
self._images = itertools.cycle(images)
def send_app_message(self, message: Any, participantId:str|None):
""" Child classes should override this to send a custom message to the room. """
pass
def _run_camera(self):
try:
while not self._stop_threads.is_set():
@@ -440,6 +448,8 @@ class BaseTransportService:
self._set_image(frame.image)
elif isinstance(frame, SpriteFrame):
self._set_images(frame.images)
elif isinstance(frame, SendAppMessageFrame):
self.send_app_message(frame.message, frame.participantId)
elif len(b):
self.write_frame_to_mic(bytes(b))
b = bytearray()

View File

@@ -6,8 +6,10 @@ import threading
import types
from functools import partial
from typing import Any
from dailyai.pipeline.frames import (
ReceivedAppMessageFrame,
TranscriptionQueueFrame,
)
@@ -124,6 +126,9 @@ class DailyTransportService(BaseTransportService, EventHandler):
def write_frame_to_mic(self, frame: bytes):
self.mic.write_frames(frame)
def send_app_message(self, message: Any, participantId: str | None):
self.client.send_app_message(message, participantId)
def read_audio_frames(self, desired_frame_count):
bytes = self._speaker.read_frames(desired_frame_count)
return bytes
@@ -219,7 +224,7 @@ class DailyTransportService(BaseTransportService, EventHandler):
pass
def call_joined(self, join_data, client_error):
#self._logger.info(f"Call_joined: {join_data}, {client_error}")
# self._logger.info(f"Call_joined: {join_data}, {client_error}")
pass
def dialout(self, number):
@@ -243,8 +248,13 @@ class DailyTransportService(BaseTransportService, EventHandler):
if len(self.client.participants()) < self._min_others_count + 1:
self._stop_threads.set()
def on_app_message(self, message, sender):
pass
def on_app_message(self, message:Any, sender:str):
if self._loop:
frame = ReceivedAppMessageFrame(message, sender)
print(frame)
asyncio.run_coroutine_threadsafe(
self.receive_queue.put(frame), self._loop
)
def on_transcription_message(self, message: dict):
if self._loop: