Emit on_assistant_turn_stopped and on_user_turn_stopped from EndFrame or CancelFrame

This commit is contained in:
Mark Backman
2026-01-27 12:59:20 -05:00
parent f7a1c6b719
commit e80e0eab29
4 changed files with 124 additions and 15 deletions

1
changelog/3575.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `LLMUserAggregator` and `LLMAssistantAggregator` not emitting pending transcripts via `on_user_turn_stopped` and `on_assistant_turn_stopped` events when the conversation ends (`EndFrame`) or is cancelled (`CancelFrame`).

View File

@@ -464,9 +464,11 @@ class LLMUserAggregator(LLMContextAggregator):
await s.setup(self.task_manager)
async def _stop(self, frame: EndFrame):
await self._maybe_emit_user_turn_stopped(on_session_end=True)
await self._cleanup()
async def _cancel(self, frame: CancelFrame):
await self._maybe_emit_user_turn_stopped(on_session_end=True)
await self._cleanup()
async def _cleanup(self):
@@ -602,14 +604,7 @@ class LLMUserAggregator(LLMContextAggregator):
if params.enable_user_speaking_frames:
await self.broadcast_frame(UserStoppedSpeakingFrame)
# Always push context frame.
aggregation = await self.push_aggregation()
message = UserTurnStoppedMessage(
content=aggregation, timestamp=self._user_turn_start_timestamp
)
await self._call_event_handler("on_user_turn_stopped", strategy, message)
self._user_turn_start_timestamp = ""
await self._maybe_emit_user_turn_stopped(strategy)
async def _on_user_turn_stop_timeout(self, controller):
await self._call_event_handler("on_user_turn_stop_timeout")
@@ -617,6 +612,26 @@ class LLMUserAggregator(LLMContextAggregator):
async def _on_user_turn_idle(self, controller):
await self._call_event_handler("on_user_turn_idle")
async def _maybe_emit_user_turn_stopped(
self,
strategy: Optional[BaseUserTurnStopStrategy] = None,
on_session_end: bool = False,
):
"""Maybe emit user turn stopped event.
Args:
strategy: The strategy that triggered the turn stop.
on_session_end: If True, only emit if there's unemitted content
(avoids duplicate events when session ends).
"""
aggregation = await self.push_aggregation()
if not on_session_end or aggregation:
message = UserTurnStoppedMessage(
content=aggregation, timestamp=self._user_turn_start_timestamp
)
await self._call_event_handler("on_user_turn_stopped", strategy, message)
self._user_turn_start_timestamp = ""
class LLMAssistantAggregator(LLMContextAggregator):
"""Assistant LLM aggregator that processes bot responses and function calls.
@@ -739,6 +754,9 @@ class LLMAssistantAggregator(LLMContextAggregator):
if isinstance(frame, InterruptionFrame):
await self._handle_interruptions(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, (EndFrame, CancelFrame)):
await self._handle_end_or_cancel(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, LLMFullResponseStartFrame):
await self._handle_llm_start(frame)
elif isinstance(frame, LLMFullResponseEndFrame):
@@ -813,6 +831,10 @@ class LLMAssistantAggregator(LLMContextAggregator):
self._started = 0
await self.reset()
async def _handle_end_or_cancel(self, frame: Frame):
await self._trigger_assistant_turn_stopped()
self._started = 0
async def _handle_function_calls_started(self, frame: FunctionCallsStartedFrame):
function_names = [f"{f.function_name}:{f.tool_call_id}" for f in frame.function_calls]
logger.debug(f"{self} FunctionCallsStartedFrame: {function_names}")

View File

@@ -344,6 +344,35 @@ class TestLLMUserAggregator(unittest.IsolatedAsyncioTestCase):
# The user mute strategies should have muted the user.
self.assertFalse(user_turn)
async def test_pending_transcription_emitted_on_end_frame(self):
"""Pending user transcription should be emitted when EndFrame arrives."""
context = LLMContext()
user_aggregator = LLMUserAggregator(context)
stop_messages = []
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message):
stop_messages.append((strategy, message))
pipeline = Pipeline([user_aggregator])
# Start turn and send transcription, but don't trigger normal turn stop
frames_to_send = [
VADUserStartedSpeakingFrame(),
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
# No VADUserStoppedSpeakingFrame - turn doesn't stop normally
# EndFrame will be sent by run_test, triggering emission
]
await run_test(pipeline, frames_to_send=frames_to_send)
# The pending transcription should be emitted on EndFrame
self.assertEqual(len(stop_messages), 1)
strategy, message = stop_messages[0]
self.assertIsNone(strategy) # strategy is None for end/cancel
self.assertEqual(message.content, "Hello!")
class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase):
async def test_empty(self):
@@ -512,3 +541,28 @@ class TestLLMAssistantAggregator(unittest.IsolatedAsyncioTestCase):
]
await run_test(aggregator, frames_to_send=frames_to_send)
self.assertEqual(thought_message.content, "I'm thinking!")
async def test_pending_text_emitted_on_end_frame(self):
"""Pending assistant text should be emitted when EndFrame arrives."""
context = LLMContext()
aggregator = LLMAssistantAggregator(context)
stop_messages = []
@aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
stop_messages.append(message)
# Start response and send text, but don't send LLMFullResponseEndFrame
frames_to_send = [
LLMFullResponseStartFrame(),
LLMTextFrame("Hello from Pipecat!"),
# No LLMFullResponseEndFrame - response doesn't end normally
# EndFrame will be sent by run_test, triggering emission
]
await run_test(aggregator, frames_to_send=frames_to_send)
# The pending text should be emitted on EndFrame
self.assertEqual(len(stop_messages), 1)
self.assertEqual(stop_messages[0].content, "Hello from Pipecat!")

46
uv.lock generated
View File

@@ -38,12 +38,44 @@ wheels = [
[[package]]
name = "aic-sdk"
version = "1.2.0"
version = "2.0.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f9/ba/3ebe31b91e03d42437ec864e9d2af3a52b7ccc73a1a0c1026275956270b0/aic_sdk-1.2.0.tar.gz", hash = "sha256:eeda9a181c679f175dbe6f0efc0c67ec98ff3d84cfe01541fef7fa12ecd505ca", size = 35606, upload-time = "2025-11-20T14:42:14.333Z" }
sdist = { url = "https://files.pythonhosted.org/packages/68/c6/1f0b3d3d226c6d19ec654fdaea7859ee9931e0286735385b1f9ea4bcfba1/aic_sdk-2.0.1.tar.gz", hash = "sha256:2480d8398a26639ed7fb5175c37da82cf5e6b1138a1a301938cd8491fe461c20", size = 73091, upload-time = "2026-01-23T23:38:15.77Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ae/cf/b2f56f3129b8e393362487b6828a6811cc2f252d438bbf53dc917fd53f23/aic_sdk-2.0.1-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:583e0b51d236d02396b9d13fce112bb63aa2b6953e42c925af093beea2b82edb", size = 4892239, upload-time = "2026-01-23T23:36:15.832Z" },
{ url = "https://files.pythonhosted.org/packages/92/bc/300366b9a64c97ca40db4d54a0ab8390f4c6860bf6cb5e1e0c55988aca1f/aic_sdk-2.0.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ef80b2ef5d1f43ef28e117c7db3503e4877d532e12ebac79dd0c0a1944bc6a0a", size = 4449896, upload-time = "2026-01-23T23:36:20.784Z" },
{ url = "https://files.pythonhosted.org/packages/52/76/57e365ede8d4f88dbdce119ec6d8910d76c5e85e506ee3062a4a1222ea97/aic_sdk-2.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4ee7b00bcf7eb870ef05bdefcb65eaf4894285155d454e85187c49f313978152", size = 3595181, upload-time = "2026-01-23T23:36:25.641Z" },
{ url = "https://files.pythonhosted.org/packages/c4/59/f6d92c34469ab54c74cbd59590d2f0f8247d2e576f0f97723e11004708ff/aic_sdk-2.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:05fb0b74a457f3749a90414304e1291fcb6ffb8019f3c59f39c2f395eabf902b", size = 4111674, upload-time = "2026-01-23T23:36:31.985Z" },
{ url = "https://files.pythonhosted.org/packages/ad/78/2fe743d9194f4a187ca72dd9e24d96c9f3687e11990f2ebb2f900719303e/aic_sdk-2.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:559307bded02c0b64a00595ec8e5383bb7fe5e9b0865cd9b49e2b15411057f1a", size = 3663836, upload-time = "2026-01-23T23:36:36.322Z" },
{ url = "https://files.pythonhosted.org/packages/8b/e4/2f6bdd665b4d4da43e890f8849daf9661ef36c7304a4c675f3cbf617cb14/aic_sdk-2.0.1-cp310-cp310-win_arm64.whl", hash = "sha256:e4b64f289416779711cd083905abdd80fdb4f8a6802480b958951ded1517c6a5", size = 3275160, upload-time = "2026-01-23T23:36:39.014Z" },
{ url = "https://files.pythonhosted.org/packages/57/6f/2a065d61ed333e46a704f6592b33a88ffd0848b2efa99b039c8e427b21a3/aic_sdk-2.0.1-cp311-cp311-macosx_10_12_x86_64.whl", hash = "sha256:06aa50a7f014c8b06387cdea6fb37c53c9697490eab98959039aeccc8d51e360", size = 4892089, upload-time = "2026-01-23T23:36:41.249Z" },
{ url = "https://files.pythonhosted.org/packages/de/f7/cd0c82cec01a94d7e121d411780f43cb8e6611bd797a10c02fd02c858f49/aic_sdk-2.0.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:1f1ade29783354f09f270ce38649dd6aed57c237c1b090b2ddc0fb61bc651d47", size = 4449813, upload-time = "2026-01-23T23:36:43.845Z" },
{ url = "https://files.pythonhosted.org/packages/44/16/d90d39716cf487f0a41fd5bd01670884f9d0901902d6616595ad3ea17464/aic_sdk-2.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:17040ea4d6a686429a214a5673c362890ad10cefb265b6f878a240763e6f39ef", size = 3594996, upload-time = "2026-01-23T23:36:46.334Z" },
{ url = "https://files.pythonhosted.org/packages/21/5d/8852484f85fa60a8ec2e696f6de8363301cd6100b2e5a68289ccc36d02ca/aic_sdk-2.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f13f9b2211136dd6f46fa2f148a55aabf5b9c3cb40fc0beed9e435b0df60d34c", size = 4111589, upload-time = "2026-01-23T23:36:50.448Z" },
{ url = "https://files.pythonhosted.org/packages/71/ca/22c99be2aca92f77d4f0fe742827cd2db5f0c761797ebe0e5bd43872259a/aic_sdk-2.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:f4c3556bad0b74f2c0a5c2a253f14ca58b7129ce5b17848b8b0948f68286639d", size = 3663706, upload-time = "2026-01-23T23:36:54.581Z" },
{ url = "https://files.pythonhosted.org/packages/19/fc/fbd7ee793cf15ef3319d12399ee9300c21c09acf654e1d8d1f64f682d750/aic_sdk-2.0.1-cp311-cp311-win_arm64.whl", hash = "sha256:11de01064d028adeb2d2edda4546e86002d5b43710fcdc00a33ee2403a1676d4", size = 3274994, upload-time = "2026-01-23T23:36:58.177Z" },
{ url = "https://files.pythonhosted.org/packages/8b/04/07ed2ae4b4dc9f31522fa971791fd7d7e38feac8ce2b9d3316394b2e5fe5/aic_sdk-2.0.1-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:f48dde209a704a51e65a44c7846c033dc860003467cef0fc2d15d7f8aa137dbc", size = 4893276, upload-time = "2026-01-23T23:37:03.097Z" },
{ url = "https://files.pythonhosted.org/packages/58/87/6328bcf58e633acdf65fd72c4dee61f468fef399c0868e5c446b99166bf5/aic_sdk-2.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2017ea843fc9e38612a13f1b0a668428a3f6862792baf230ac79a65d9c0633d9", size = 4450341, upload-time = "2026-01-23T23:37:08.648Z" },
{ url = "https://files.pythonhosted.org/packages/fc/59/da5138346944ac7dc61ed70e66c1fb2fddef815dc2bab561316db5aef252/aic_sdk-2.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:065954c17116b96408ebfbff29152ca458bb083a9e56178c70adbafdda08218a", size = 3594974, upload-time = "2026-01-23T23:37:13.83Z" },
{ url = "https://files.pythonhosted.org/packages/06/d8/17e1a77820a6848efb7c97751bc6022f65c5ca6436dc3caf3a9da356def1/aic_sdk-2.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fa7a196160d6eaf2b856c542bc967c2e08e11a5d93ac4e632f843a01b2872274", size = 4113591, upload-time = "2026-01-23T23:37:19.067Z" },
{ url = "https://files.pythonhosted.org/packages/38/3b/04b70a75364c2ef1717018a81963a8e16bffc3f9f064f125cb111870b6a4/aic_sdk-2.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:dbd007a683ebff4def95fa5a7ace1602aa2d150fa80761231b044edd57e98bbb", size = 3661883, upload-time = "2026-01-23T23:37:25.242Z" },
{ url = "https://files.pythonhosted.org/packages/2a/bd/64bc3ce090cc110f3721c5e54f97f9fcb67dc50bd8dc6408896650d1d68e/aic_sdk-2.0.1-cp312-cp312-win_arm64.whl", hash = "sha256:f776c5f0425b39073d4caca2f0bdea036647c4162d4673ef498e1306d41bb39e", size = 3271232, upload-time = "2026-01-23T23:37:30.005Z" },
{ url = "https://files.pythonhosted.org/packages/6e/72/8445a7201aa5969216b5d4ab60bb2ebefa2ac07f557e9ebca27172be2f00/aic_sdk-2.0.1-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:a7ff35422ffb813e8a5b4afed6eb56d4e8abc1ecabf464084d4c7b5b8aff0e43", size = 4892624, upload-time = "2026-01-23T23:37:33.229Z" },
{ url = "https://files.pythonhosted.org/packages/c7/9c/5b060cbd9e9bcea5e62df13cf3e722f4355286e2174c298ffcfe337c680d/aic_sdk-2.0.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:6cda0b6db664712099da483f803128e4e5256625aed2d85e65e5cc823a0e873b", size = 4449490, upload-time = "2026-01-23T23:37:35.938Z" },
{ url = "https://files.pythonhosted.org/packages/e4/f8/ac61d007dc8d158a8f516327db74b6f3b1cf78b16be43acd29775197533d/aic_sdk-2.0.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7ade8754bd878da0509e70636a9b4eaba0280741db5afabf752102ef605ffaac", size = 3594360, upload-time = "2026-01-23T23:37:38.943Z" },
{ url = "https://files.pythonhosted.org/packages/1b/c4/af0c00055450b060b23e8dd5f3c1a208ea1444c6b497eaf29d3de6e215fa/aic_sdk-2.0.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9b2f44c660aa29613be05c576da25092b3570c8fb3dddc09b70625789d066202", size = 4112325, upload-time = "2026-01-23T23:37:41.613Z" },
{ url = "https://files.pythonhosted.org/packages/86/8e/62a7c53cc1bf2345ea20b554d1d8a61058301cd8088ff94ba95809b04a02/aic_sdk-2.0.1-cp313-cp313-win_amd64.whl", hash = "sha256:ce1656991fc4dbbb40257c72a4b8fc4d4839363ca4b7b25a84ae5a83914ae90c", size = 3661441, upload-time = "2026-01-23T23:37:45.025Z" },
{ url = "https://files.pythonhosted.org/packages/39/98/aa9d6ccba0a1902f8480544fcf468dd3696ecb5392f02c2770f9020e6f9a/aic_sdk-2.0.1-cp313-cp313-win_arm64.whl", hash = "sha256:0138e964feb15d9fb5d2c9c64a8d45a807171900f53351e5525c26869237bd1c", size = 3270753, upload-time = "2026-01-23T23:37:47.882Z" },
{ url = "https://files.pythonhosted.org/packages/c0/3e/6a693ba223e2e55e142983c6243968222070405c6a90ec4c5a61b46652c1/aic_sdk-2.0.1-cp314-cp314-macosx_10_12_x86_64.whl", hash = "sha256:11eb0c3686ff83f340c875b864840fad19e3a98cd6e59815f83e9248a3ffb397", size = 4893527, upload-time = "2026-01-23T23:37:52.021Z" },
{ url = "https://files.pythonhosted.org/packages/35/9c/f149870d75f28c851de439d4039f85aa590f47499272f932841e4dc0a9a5/aic_sdk-2.0.1-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:27844521dc1ae3e1226e7371ec3e68fc4726515f14c5e82a6030f276b612c1a8", size = 4450169, upload-time = "2026-01-23T23:37:55.964Z" },
{ url = "https://files.pythonhosted.org/packages/92/87/8ee4e1763b603ad3d6d535d7ecfa7a2943145bcc18f2db4600279aa37af3/aic_sdk-2.0.1-cp314-cp314-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:adf617d4e4e8910764118d1baf1521c752218fd304c75d7e22352d4755cabd50", size = 3595300, upload-time = "2026-01-23T23:37:59.494Z" },
{ url = "https://files.pythonhosted.org/packages/6a/b4/06d6f5c1b45d839d4d8ad4fbcb45dc224e980c69976c48e39d5e32850c51/aic_sdk-2.0.1-cp314-cp314-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e2bc9071599b9703783b6b100758cd7621b30a64abddc8072f6872932b74c21", size = 4113837, upload-time = "2026-01-23T23:38:03.565Z" },
{ url = "https://files.pythonhosted.org/packages/1e/35/d7a2f7b37183b08b2e8969c3d6c6d1824253cd32894d72f250075edee654/aic_sdk-2.0.1-cp314-cp314-win_amd64.whl", hash = "sha256:9db2bfb4f1ab40a4b130d8d0e158277461b25c7b78bffffba90f816766cb28e9", size = 3663055, upload-time = "2026-01-23T23:38:07.741Z" },
{ url = "https://files.pythonhosted.org/packages/90/97/9ed859e70b1d0c68edc9748c4e69d251e89a3faa462f36ce26c1f8aa7844/aic_sdk-2.0.1-cp314-cp314-win_arm64.whl", hash = "sha256:3c6ed1bfda589970e6c6b96ae29f112baa430ad91e149e76004825870198a5c7", size = 3272737, upload-time = "2026-01-23T23:38:13.966Z" },
]
[[package]]
name = "aioboto3"
@@ -4495,7 +4527,7 @@ docs = [
[package.metadata]
requires-dist = [
{ name = "accelerate", marker = "extra == 'moondream'", specifier = "~=1.10.0" },
{ name = "aic-sdk", marker = "extra == 'aic'", specifier = "~=1.2.0" },
{ name = "aic-sdk", marker = "extra == 'aic'", specifier = "~=2.0.1" },
{ name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.5.0" },
{ name = "aiofiles", specifier = ">=24.1.0,<25" },
{ name = "aiohttp", specifier = ">=3.11.12,<4" },
@@ -4586,7 +4618,7 @@ requires-dist = [
{ name = "simli-ai", marker = "extra == 'simli'", specifier = "~=1.0.3" },
{ name = "soundfile", marker = "extra == 'soundfile'", specifier = "~=0.13.1" },
{ name = "soxr", specifier = "~=0.5.0" },
{ name = "speechmatics-voice", extras = ["smart"], marker = "extra == 'speechmatics'", specifier = ">=0.2.6" },
{ name = "speechmatics-voice", extras = ["smart"], marker = "extra == 'speechmatics'", specifier = ">=0.2.8" },
{ name = "strands-agents", marker = "extra == 'strands'", specifier = ">=1.9.1,<2" },
{ name = "tenacity", marker = "extra == 'livekit'", specifier = ">=8.2.3,<10.0.0" },
{ name = "timm", marker = "extra == 'moondream'", specifier = "~=1.0.13" },
@@ -6420,16 +6452,16 @@ wheels = [
[[package]]
name = "speechmatics-voice"
version = "0.2.7"
version = "0.2.8"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy" },
{ name = "pydantic" },
{ name = "speechmatics-rt" },
]
sdist = { url = "https://files.pythonhosted.org/packages/4a/94/47280c7fa5264676bfd6a2373c5cbfa562d5f1aefd77d7f241641a4889a6/speechmatics_voice-0.2.7.tar.gz", hash = "sha256:392b5129d2cbc0059f122fdf960d88dc59df5f26808992ef031f2eb40713c936", size = 61137, upload-time = "2026-01-12T14:21:17.672Z" }
sdist = { url = "https://files.pythonhosted.org/packages/e4/b2/72b5b2203bbefbd22e7692adaca0dd7c2feebed1aaea5599ec579f74fbbf/speechmatics_voice-0.2.8.tar.gz", hash = "sha256:b2d9cbf773fd94400c744734662e2b16b5bdc4271d0dafde46ac032c438fe000", size = 61419, upload-time = "2026-01-26T16:26:09.082Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/72/e74dbcd42935b31b1d188b8f9d932d9d4078ea5edf303bb0ba0af4203ba2/speechmatics_voice-0.2.7-py3-none-any.whl", hash = "sha256:79c6072a5bf21cfa75770b5e3855cff5747222b024c417a276d0b9c2ae83cd0c", size = 57323, upload-time = "2026-01-12T14:21:16.679Z" },
{ url = "https://files.pythonhosted.org/packages/89/2d/a2ab215a7a31fad5ef9267420dc9ced96d6d52e5b80b131ef41424607849/speechmatics_voice-0.2.8-py3-none-any.whl", hash = "sha256:423ac7620ae8c98f175faace2184ac4ab1fe448ffb41af57aae05ec655326f79", size = 57629, upload-time = "2026-01-26T16:26:07.59Z" },
]
[package.optional-dependencies]