diff --git a/src/dailyai/services/daily_transport_service.py b/src/dailyai/services/daily_transport_service.py index bf3d16ca3..e18fbfc41 100644 --- a/src/dailyai/services/daily_transport_service.py +++ b/src/dailyai/services/daily_transport_service.py @@ -261,6 +261,7 @@ class DailyTransportService(EventHandler): for frame in frames: if frame.frame_type == FrameType.END_STREAM: self.logger.info("Stopping frame consumer thread") + self.output_queue.task_done() return # if interrupted, we just pull frames off the queue and discard them diff --git a/src/samples/theoretical-to-real/01-say-one-thing.py b/src/samples/theoretical-to-real/01-say-one-thing.py index d90bc212e..50b7e01f3 100644 --- a/src/samples/theoretical-to-real/01-say-one-thing.py +++ b/src/samples/theoretical-to-real/01-say-one-thing.py @@ -39,6 +39,13 @@ async def main(room_url): async for audio in audio_generator: transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio)) + # Put an "end stream" item on the queue, which will cause it to shut down when it's processed + # all the audio. + transport.output_queue.put(OutputQueueFrame(FrameType.END_STREAM, None)) + transport.output_queue.join() + + transport.stop() + await transport.run() diff --git a/src/samples/theoretical-to-real/02-llm-say-one-thing.py b/src/samples/theoretical-to-real/02-llm-say-one-thing.py index b9d0aeea3..02ede4457 100644 --- a/src/samples/theoretical-to-real/02-llm-say-one-thing.py +++ b/src/samples/theoretical-to-real/02-llm-say-one-thing.py @@ -32,6 +32,9 @@ async def main(room_url): @transport.event_handler("on_participant_joined") async def on_participant_joined(transport, participant): + if participant["id"] == transport.my_participant_id: + return + current_text = "" async for text in llm_generator: current_text += text @@ -40,6 +43,13 @@ async def main(room_url): transport.output_queue.put(OutputQueueFrame(FrameType.AUDIO_FRAME, audio)) current_text = "" + # Put an "end stream" item on the queue, which will cause it to shut down when it's processed + # all the audio. + transport.output_queue.put(OutputQueueFrame(FrameType.END_STREAM, None)) + transport.output_queue.join() + + transport.stop() + await transport.run()