diff --git a/engine/adapters/__init__.py b/engine/adapters/__init__.py new file mode 100644 index 0000000..6a94df0 --- /dev/null +++ b/engine/adapters/__init__.py @@ -0,0 +1 @@ +"""Adapters package.""" diff --git a/engine/adapters/control_plane/__init__.py b/engine/adapters/control_plane/__init__.py new file mode 100644 index 0000000..06f5004 --- /dev/null +++ b/engine/adapters/control_plane/__init__.py @@ -0,0 +1 @@ +"""Control-plane adapters package.""" diff --git a/engine/app/backend_adapters.py b/engine/adapters/control_plane/backend.py similarity index 100% rename from engine/app/backend_adapters.py rename to engine/adapters/control_plane/backend.py diff --git a/engine/app/main.py b/engine/app/main.py index 5625061..09ffa1d 100644 --- a/engine/app/main.py +++ b/engine/app/main.py @@ -20,11 +20,11 @@ except ImportError: logger.warning("aiortc not available - WebRTC endpoint will be disabled") from app.config import settings -from app.backend_adapters import build_backend_adapter_from_settings -from core.transports import SocketTransport, WebRtcTransport, BaseTransport -from core.session import Session +from adapters.control_plane.backend import build_backend_adapter_from_settings +from runtime.transports import SocketTransport, WebRtcTransport, BaseTransport +from runtime.session.manager import Session from processors.tracks import Resampled16kTrack -from core.events import get_event_bus, reset_event_bus +from runtime.events import get_event_bus, reset_event_bus # Check interval for heartbeat/timeout (seconds) _HEARTBEAT_CHECK_INTERVAL_SEC = 5 diff --git a/engine/core/__init__.py b/engine/core/__init__.py deleted file mode 100644 index 0110686..0000000 --- a/engine/core/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -"""Core Components Package""" - -from core.events import EventBus, get_event_bus -from core.transports import BaseTransport, SocketTransport, WebRtcTransport -from core.session import Session -from core.conversation import ConversationManager, ConversationState, ConversationTurn -from core.duplex_pipeline import DuplexPipeline - -__all__ = [ - "EventBus", - "get_event_bus", - "BaseTransport", - "SocketTransport", - "WebRtcTransport", - "Session", - "ConversationManager", - "ConversationState", - "ConversationTurn", - "DuplexPipeline", -] diff --git a/engine/docs/backend_integration.md b/engine/docs/backend_integration.md index bcd44fc..22fa09e 100644 --- a/engine/docs/backend_integration.md +++ b/engine/docs/backend_integration.md @@ -27,15 +27,15 @@ Assistant config source behavior: ## Architecture -- Ports: `core/ports/control_plane.py` -- Adapters: `app/backend_adapters.py` +- Ports: `runtime/ports/control_plane.py` +- Adapters: `adapters/control_plane/backend.py` `Session` and `DuplexPipeline` receive backend capabilities via injected adapter methods instead of hard-coding backend client imports. ## Async History Writes -Session history persistence is handled by `core/history_bridge.py`. +Session history persistence is handled by `runtime/history/bridge.py`. Design: diff --git a/engine/docs/extension_ports.md b/engine/docs/extension_ports.md index 47f596a..8566194 100644 --- a/engine/docs/extension_ports.md +++ b/engine/docs/extension_ports.md @@ -4,31 +4,31 @@ This document defines the draft port set used to keep core runtime extensible. ## Port Modules -- `core/ports/control_plane.py` +- `runtime/ports/control_plane.py` - `AssistantRuntimeConfigProvider` - `ConversationHistoryStore` - `KnowledgeRetriever` - `ToolCatalog` - `ControlPlaneGateway` -- `core/ports/llm.py` +- `runtime/ports/llm.py` - `LLMServiceSpec` - `LLMPort` - optional extensions: `LLMCancellable`, `LLMRuntimeConfigurable` -- `core/ports/tts.py` +- `runtime/ports/tts.py` - `TTSServiceSpec` - `TTSPort` -- `core/ports/asr.py` +- `runtime/ports/asr.py` - `ASRServiceSpec` - `ASRPort` - optional extensions: `ASRInterimControl`, `ASRBufferControl` -- `core/ports/service_factory.py` +- `runtime/ports/service_factory.py` - `RealtimeServiceFactory` ## Adapter Layer -- `app/service_factory.py` provides `DefaultRealtimeServiceFactory`. +- `providers/factory/default.py` provides `DefaultRealtimeServiceFactory`. - It maps resolved provider specs to concrete adapters. -- Core orchestration (`core/duplex_pipeline.py`) depends on the factory port/specs, not concrete provider classes. +- Runtime orchestration (`runtime/pipeline/duplex.py`) depends on the factory port/specs, not concrete provider classes. ## Provider Behavior (Current) diff --git a/engine/docs/high_level_architecture.md b/engine/docs/high_level_architecture.md index 91e6845..bdae564 100644 --- a/engine/docs/high_level_architecture.md +++ b/engine/docs/high_level_architecture.md @@ -14,19 +14,19 @@ This document describes the runtime architecture of `engine` for realtime voice/ ```mermaid flowchart LR C[Client\nWeb / Mobile / Device] <-- WS v1 + PCM --> A[FastAPI App\napp/main.py] - A --> S[Session\ncore/session.py] - S --> D[Duplex Pipeline\ncore/duplex_pipeline.py] + A --> S[Session\nruntime/session/manager.py] + S --> D[Duplex Pipeline\nruntime/pipeline/duplex.py] D --> P[Processors\nVAD / EOU / Tracks] - D --> R[Workflow Runner\ncore/workflow_runner.py] - D --> E[Event Bus + Models\ncore/events.py + models/*] + D --> R[Workflow Runner\nworkflow/runner.py] + D --> E[Event Bus + Models\nruntime/events.py + protocol/ws_v1/*] - R --> SV[Service Layer\nservices/asr.py\nservices/llm.py\nservices/tts.py] - R --> TE[Tool Executor\ncore/tool_executor.py] + R --> SV[Service Layer\nproviders/asr/*\nproviders/llm/*\nproviders/tts/*] + R --> TE[Tool Executor\ntools/executor.py] - S --> HB[History Bridge\ncore/history_bridge.py] - S --> BA[Control Plane Port\ncore/ports/control_plane.py] - BA --> AD[Adapters\napp/backend_adapters.py] + S --> HB[History Bridge\nruntime/history/bridge.py] + S --> BA[Control Plane Port\nruntime/ports/control_plane.py] + BA --> AD[Adapters\nadapters/control_plane/backend.py] AD --> B[(External Backend API\noptional)] SV --> M[(ASR/LLM/TTS Providers)] @@ -58,7 +58,7 @@ flowchart LR ### 2) Session + Orchestration Layer -- Core: `core/session.py`, `core/duplex_pipeline.py`, `core/conversation.py` +- Core: `runtime/session/manager.py`, `runtime/pipeline/duplex.py`, `runtime/conversation.py` - Responsibilities: - Per-session state machine - Turn boundaries and interruption/cancel handling @@ -75,7 +75,7 @@ flowchart LR ### 4) Workflow + Tooling Layer -- Modules: `core/workflow_runner.py`, `core/tool_executor.py` +- Modules: `workflow/runner.py`, `tools/executor.py` - Responsibilities: - Assistant workflow execution - Tool call planning/execution and timeout handling @@ -83,7 +83,7 @@ flowchart LR ### 5) Service Integration Layer -- Modules: `services/*` +- Modules: `providers/*` - Responsibilities: - Abstracting ASR/LLM/TTS provider differences - Streaming token/audio adaptation @@ -91,8 +91,8 @@ flowchart LR ### 6) Backend Integration Layer (Optional) -- Port: `core/ports/control_plane.py` -- Adapters: `app/backend_adapters.py` +- Port: `runtime/ports/control_plane.py` +- Adapters: `adapters/control_plane/backend.py` - Responsibilities: - Fetching assistant runtime config - Persisting call/session metadata and history @@ -100,7 +100,7 @@ flowchart LR ### 7) Persistence / Reliability Layer -- Module: `core/history_bridge.py` +- Module: `runtime/history/bridge.py` - Responsibilities: - Non-blocking queue-based history writes - Retry with backoff on backend failures diff --git a/engine/docs/import_migration.md b/engine/docs/import_migration.md new file mode 100644 index 0000000..eaeba1c --- /dev/null +++ b/engine/docs/import_migration.md @@ -0,0 +1,21 @@ +# Canonical Module Layout + +This MVP uses a single canonical module layout without legacy import shims. + +## Runtime and protocol + +- `protocol.ws_v1.schema` +- `runtime.session.manager` +- `runtime.pipeline.duplex` +- `runtime.history.bridge` +- `runtime.events` +- `runtime.transports` +- `runtime.conversation` +- `runtime.ports.*` + +## Integrations and orchestration + +- `providers.*` +- `adapters.control_plane.backend` +- `workflow.runner` +- `tools.executor` diff --git a/engine/docs/ws_v1_schema_zh.md b/engine/docs/ws_v1_schema_zh.md index ce5f175..1681c23 100644 --- a/engine/docs/ws_v1_schema_zh.md +++ b/engine/docs/ws_v1_schema_zh.md @@ -7,9 +7,9 @@ - 握手顺序、状态机、错误语义与实现细节。 实现对照来源: -- `models/ws_v1.py` -- `core/session.py` -- `core/duplex_pipeline.py` +- `protocol/ws_v1/schema.py` +- `runtime/session/manager.py` +- `runtime/pipeline/duplex.py` - `app/main.py` --- diff --git a/engine/models/__init__.py b/engine/models/__init__.py deleted file mode 100644 index 924d5fd..0000000 --- a/engine/models/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Data Models Package""" diff --git a/engine/models/commands.py b/engine/models/commands.py deleted file mode 100644 index 5bcf47e..0000000 --- a/engine/models/commands.py +++ /dev/null @@ -1,143 +0,0 @@ -"""Protocol command models matching the original active-call API.""" - -from typing import Optional, Dict, Any -from pydantic import BaseModel, Field - - -class InviteCommand(BaseModel): - """Invite command to initiate a call.""" - - command: str = Field(default="invite", description="Command type") - option: Optional[Dict[str, Any]] = Field(default=None, description="Call configuration options") - - -class AcceptCommand(BaseModel): - """Accept command to accept an incoming call.""" - - command: str = Field(default="accept", description="Command type") - option: Optional[Dict[str, Any]] = Field(default=None, description="Call configuration options") - - -class RejectCommand(BaseModel): - """Reject command to reject an incoming call.""" - - command: str = Field(default="reject", description="Command type") - reason: str = Field(default="", description="Reason for rejection") - code: Optional[int] = Field(default=None, description="SIP response code") - - -class RingingCommand(BaseModel): - """Ringing command to send ringing response.""" - - command: str = Field(default="ringing", description="Command type") - recorder: Optional[Dict[str, Any]] = Field(default=None, description="Call recording configuration") - early_media: bool = Field(default=False, description="Enable early media") - ringtone: Optional[str] = Field(default=None, description="Custom ringtone URL") - - -class TTSCommand(BaseModel): - """TTS command to convert text to speech.""" - - command: str = Field(default="tts", description="Command type") - text: str = Field(..., description="Text to synthesize") - speaker: Optional[str] = Field(default=None, description="Speaker voice name") - play_id: Optional[str] = Field(default=None, description="Unique identifier for this TTS session") - auto_hangup: bool = Field(default=False, description="Auto hangup after TTS completion") - streaming: bool = Field(default=False, description="Streaming text input") - end_of_stream: bool = Field(default=False, description="End of streaming input") - wait_input_timeout: Optional[int] = Field(default=None, description="Max time to wait for input (seconds)") - option: Optional[Dict[str, Any]] = Field(default=None, description="TTS provider specific options") - - -class PlayCommand(BaseModel): - """Play command to play audio from URL.""" - - command: str = Field(default="play", description="Command type") - url: str = Field(..., description="URL of audio file to play") - auto_hangup: bool = Field(default=False, description="Auto hangup after playback") - wait_input_timeout: Optional[int] = Field(default=None, description="Max time to wait for input (seconds)") - - -class InterruptCommand(BaseModel): - """Interrupt command to interrupt current playback.""" - - command: str = Field(default="interrupt", description="Command type") - graceful: bool = Field(default=False, description="Wait for current TTS to complete") - - -class PauseCommand(BaseModel): - """Pause command to pause current playback.""" - - command: str = Field(default="pause", description="Command type") - - -class ResumeCommand(BaseModel): - """Resume command to resume paused playback.""" - - command: str = Field(default="resume", description="Command type") - - -class HangupCommand(BaseModel): - """Hangup command to end the call.""" - - command: str = Field(default="hangup", description="Command type") - reason: Optional[str] = Field(default=None, description="Reason for hangup") - initiator: Optional[str] = Field(default=None, description="Who initiated the hangup") - - -class HistoryCommand(BaseModel): - """History command to add conversation history.""" - - command: str = Field(default="history", description="Command type") - speaker: str = Field(..., description="Speaker identifier") - text: str = Field(..., description="Conversation text") - - -class ChatCommand(BaseModel): - """Chat command for text-based conversation.""" - - command: str = Field(default="chat", description="Command type") - text: str = Field(..., description="Chat text message") - - -# Command type mapping -COMMAND_TYPES = { - "invite": InviteCommand, - "accept": AcceptCommand, - "reject": RejectCommand, - "ringing": RingingCommand, - "tts": TTSCommand, - "play": PlayCommand, - "interrupt": InterruptCommand, - "pause": PauseCommand, - "resume": ResumeCommand, - "hangup": HangupCommand, - "history": HistoryCommand, - "chat": ChatCommand, -} - - -def parse_command(data: Dict[str, Any]) -> BaseModel: - """ - Parse a command from JSON data. - - Args: - data: JSON data as dictionary - - Returns: - Parsed command model - - Raises: - ValueError: If command type is unknown - """ - command_type = data.get("command") - - if not command_type: - raise ValueError("Missing 'command' field") - - command_class = COMMAND_TYPES.get(command_type) - - if not command_class: - raise ValueError(f"Unknown command type: {command_type}") - - return command_class(**data) diff --git a/engine/models/config.py b/engine/models/config.py deleted file mode 100644 index 009411e..0000000 --- a/engine/models/config.py +++ /dev/null @@ -1,126 +0,0 @@ -"""Configuration models for call options.""" - -from typing import Optional, Dict, Any, List -from pydantic import BaseModel, Field - - -class VADOption(BaseModel): - """Voice Activity Detection configuration.""" - - type: str = Field(default="silero", description="VAD algorithm type (silero, webrtc)") - samplerate: int = Field(default=16000, description="Audio sample rate for VAD") - speech_padding: int = Field(default=250, description="Speech padding in milliseconds") - silence_padding: int = Field(default=100, description="Silence padding in milliseconds") - ratio: float = Field(default=0.5, description="Voice detection ratio threshold") - voice_threshold: float = Field(default=0.5, description="Voice energy threshold") - max_buffer_duration_secs: int = Field(default=50, description="Maximum buffer duration in seconds") - silence_timeout: Optional[int] = Field(default=None, description="Silence timeout in milliseconds") - endpoint: Optional[str] = Field(default=None, description="Custom VAD service endpoint") - secret_key: Optional[str] = Field(default=None, description="VAD service secret key") - secret_id: Optional[str] = Field(default=None, description="VAD service secret ID") - - -class ASROption(BaseModel): - """Automatic Speech Recognition configuration.""" - - provider: str = Field(..., description="ASR provider (tencent, aliyun, openai, etc.)") - language: Optional[str] = Field(default=None, description="Language code (zh-CN, en-US)") - app_id: Optional[str] = Field(default=None, description="Application ID") - secret_id: Optional[str] = Field(default=None, description="Secret ID for authentication") - secret_key: Optional[str] = Field(default=None, description="Secret key for authentication") - model_type: Optional[str] = Field(default=None, description="ASR model type (16k_zh, 8k_en)") - buffer_size: Optional[int] = Field(default=None, description="Audio buffer size in bytes") - samplerate: Optional[int] = Field(default=None, description="Audio sample rate") - endpoint: Optional[str] = Field(default=None, description="Custom ASR service endpoint") - extra: Optional[Dict[str, Any]] = Field(default=None, description="Additional parameters") - start_when_answer: bool = Field(default=False, description="Start ASR when call is answered") - - -class TTSOption(BaseModel): - """Text-to-Speech configuration.""" - - samplerate: Optional[int] = Field(default=None, description="TTS output sample rate") - provider: str = Field(default="msedge", description="TTS provider (tencent, aliyun, deepgram, msedge)") - speed: float = Field(default=1.0, description="Speech speed multiplier") - app_id: Optional[str] = Field(default=None, description="Application ID") - secret_id: Optional[str] = Field(default=None, description="Secret ID for authentication") - secret_key: Optional[str] = Field(default=None, description="Secret key for authentication") - volume: Optional[int] = Field(default=None, description="Speech volume level (1-10)") - speaker: Optional[str] = Field(default=None, description="Voice speaker name") - codec: Optional[str] = Field(default=None, description="Audio codec") - subtitle: bool = Field(default=False, description="Enable subtitle generation") - emotion: Optional[str] = Field(default=None, description="Speech emotion") - endpoint: Optional[str] = Field(default=None, description="Custom TTS service endpoint") - extra: Optional[Dict[str, Any]] = Field(default=None, description="Additional parameters") - max_concurrent_tasks: Optional[int] = Field(default=None, description="Max concurrent tasks") - - -class RecorderOption(BaseModel): - """Call recording configuration.""" - - recorder_file: str = Field(..., description="Path to recording file") - samplerate: int = Field(default=16000, description="Recording sample rate") - ptime: int = Field(default=200, description="Packet time in milliseconds") - - -class MediaPassOption(BaseModel): - """Media pass-through configuration for external audio processing.""" - - url: str = Field(..., description="WebSocket URL for media streaming") - input_sample_rate: int = Field(default=16000, description="Sample rate of audio received from WebSocket") - output_sample_rate: int = Field(default=16000, description="Sample rate of audio sent to WebSocket") - packet_size: int = Field(default=2560, description="Packet size in bytes") - ptime: Optional[int] = Field(default=None, description="Buffered playback period in milliseconds") - - -class SipOption(BaseModel): - """SIP protocol configuration.""" - - username: Optional[str] = Field(default=None, description="SIP username") - password: Optional[str] = Field(default=None, description="SIP password") - realm: Optional[str] = Field(default=None, description="SIP realm/domain") - headers: Optional[Dict[str, str]] = Field(default=None, description="Additional SIP headers") - - -class HandlerRule(BaseModel): - """Handler routing rule.""" - - caller: Optional[str] = Field(default=None, description="Caller pattern (regex)") - callee: Optional[str] = Field(default=None, description="Callee pattern (regex)") - playbook: Optional[str] = Field(default=None, description="Playbook file path") - webhook: Optional[str] = Field(default=None, description="Webhook URL") - - -class CallOption(BaseModel): - """Comprehensive call configuration options.""" - - # Basic options - denoise: bool = Field(default=False, description="Enable noise reduction") - offer: Optional[str] = Field(default=None, description="SDP offer string") - callee: Optional[str] = Field(default=None, description="Callee SIP URI or phone number") - caller: Optional[str] = Field(default=None, description="Caller SIP URI or phone number") - - # Audio codec - codec: str = Field(default="pcm", description="Audio codec (pcm, pcma, pcmu, g722)") - - # Component configurations - recorder: Optional[RecorderOption] = Field(default=None, description="Call recording config") - asr: Optional[ASROption] = Field(default=None, description="ASR configuration") - vad: Optional[VADOption] = Field(default=None, description="VAD configuration") - tts: Optional[TTSOption] = Field(default=None, description="TTS configuration") - media_pass: Optional[MediaPassOption] = Field(default=None, description="Media pass-through config") - sip: Optional[SipOption] = Field(default=None, description="SIP configuration") - - # Timeouts and networking - handshake_timeout: Optional[int] = Field(default=None, description="Handshake timeout in seconds") - enable_ipv6: bool = Field(default=False, description="Enable IPv6 support") - inactivity_timeout: Optional[int] = Field(default=None, description="Inactivity timeout in seconds") - - # EOU configuration - eou: Optional[Dict[str, Any]] = Field(default=None, description="End of utterance detection config") - - # Extra parameters - extra: Optional[Dict[str, Any]] = Field(default=None, description="Additional custom parameters") - - class Config: - populate_by_name = True diff --git a/engine/models/events.py b/engine/models/events.py deleted file mode 100644 index 031b8be..0000000 --- a/engine/models/events.py +++ /dev/null @@ -1,231 +0,0 @@ -"""Protocol event models matching the original active-call API.""" - -from typing import Optional, Dict, Any -from pydantic import BaseModel, Field -from datetime import datetime - - -def current_timestamp_ms() -> int: - """Get current timestamp in milliseconds.""" - return int(datetime.now().timestamp() * 1000) - - -# Base Event Model -class BaseEvent(BaseModel): - """Base event model.""" - - event: str = Field(..., description="Event type") - track_id: str = Field(..., description="Unique track identifier") - timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp in milliseconds") - - -# Lifecycle Events -class IncomingEvent(BaseEvent): - """Incoming call event (SIP only).""" - - event: str = Field(default="incoming", description="Event type") - caller: Optional[str] = Field(default=None, description="Caller's SIP URI") - callee: Optional[str] = Field(default=None, description="Callee's SIP URI") - sdp: Optional[str] = Field(default=None, description="SDP offer from caller") - - -class AnswerEvent(BaseEvent): - """Call answered event.""" - - event: str = Field(default="answer", description="Event type") - sdp: Optional[str] = Field(default=None, description="SDP answer from server") - - -class RejectEvent(BaseEvent): - """Call rejected event.""" - - event: str = Field(default="reject", description="Event type") - reason: Optional[str] = Field(default=None, description="Rejection reason") - code: Optional[int] = Field(default=None, description="SIP response code") - - -class RingingEvent(BaseEvent): - """Call ringing event.""" - - event: str = Field(default="ringing", description="Event type") - early_media: bool = Field(default=False, description="Early media available") - - -class HangupEvent(BaseModel): - """Call hangup event.""" - - event: str = Field(default="hangup", description="Event type") - timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp") - reason: Optional[str] = Field(default=None, description="Hangup reason") - initiator: Optional[str] = Field(default=None, description="Who initiated hangup") - start_time: Optional[str] = Field(default=None, description="Call start time (ISO 8601)") - hangup_time: Optional[str] = Field(default=None, description="Hangup time (ISO 8601)") - answer_time: Optional[str] = Field(default=None, description="Answer time (ISO 8601)") - ringing_time: Optional[str] = Field(default=None, description="Ringing time (ISO 8601)") - from_: Optional[Dict[str, Any]] = Field(default=None, alias="from", description="Caller info") - to: Optional[Dict[str, Any]] = Field(default=None, description="Callee info") - extra: Optional[Dict[str, Any]] = Field(default=None, description="Additional metadata") - - class Config: - populate_by_name = True - - -# VAD Events -class SpeakingEvent(BaseEvent): - """Speech detected event.""" - - event: str = Field(default="speaking", description="Event type") - start_time: int = Field(default_factory=current_timestamp_ms, description="Speech start time") - - -class SilenceEvent(BaseEvent): - """Silence detected event.""" - - event: str = Field(default="silence", description="Event type") - start_time: int = Field(default_factory=current_timestamp_ms, description="Silence start time") - duration: int = Field(default=0, description="Silence duration in milliseconds") - - -# AI/ASR Events -class AsrFinalEvent(BaseEvent): - """ASR final transcription event.""" - - event: str = Field(default="asrFinal", description="Event type") - index: int = Field(..., description="ASR result sequence number") - start_time: Optional[int] = Field(default=None, description="Speech start time") - end_time: Optional[int] = Field(default=None, description="Speech end time") - text: str = Field(..., description="Transcribed text") - - -class AsrDeltaEvent(BaseEvent): - """ASR partial transcription event (streaming).""" - - event: str = Field(default="asrDelta", description="Event type") - index: int = Field(..., description="ASR result sequence number") - start_time: Optional[int] = Field(default=None, description="Speech start time") - end_time: Optional[int] = Field(default=None, description="Speech end time") - text: str = Field(..., description="Partial transcribed text") - - -class EouEvent(BaseEvent): - """End of utterance detection event.""" - - event: str = Field(default="eou", description="Event type") - completed: bool = Field(default=True, description="Whether utterance was completed") - - -# Audio Track Events -class TrackStartEvent(BaseEvent): - """Audio track start event.""" - - event: str = Field(default="trackStart", description="Event type") - play_id: Optional[str] = Field(default=None, description="Play ID from TTS/Play command") - - -class TrackEndEvent(BaseEvent): - """Audio track end event.""" - - event: str = Field(default="trackEnd", description="Event type") - duration: int = Field(..., description="Track duration in milliseconds") - ssrc: int = Field(..., description="RTP SSRC identifier") - play_id: Optional[str] = Field(default=None, description="Play ID from TTS/Play command") - - -class InterruptionEvent(BaseEvent): - """Playback interruption event.""" - - event: str = Field(default="interruption", description="Event type") - play_id: Optional[str] = Field(default=None, description="Play ID that was interrupted") - subtitle: Optional[str] = Field(default=None, description="TTS text being played") - position: Optional[int] = Field(default=None, description="Word index position") - total_duration: Optional[int] = Field(default=None, description="Total TTS duration") - current: Optional[int] = Field(default=None, description="Elapsed time when interrupted") - - -# System Events -class ErrorEvent(BaseEvent): - """Error event.""" - - event: str = Field(default="error", description="Event type") - sender: str = Field(..., description="Component that generated the error") - error: str = Field(..., description="Error message") - code: Optional[int] = Field(default=None, description="Error code") - - -class MetricsEvent(BaseModel): - """Performance metrics event.""" - - event: str = Field(default="metrics", description="Event type") - timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp") - key: str = Field(..., description="Metric key") - duration: int = Field(..., description="Duration in milliseconds") - data: Optional[Dict[str, Any]] = Field(default=None, description="Additional metric data") - - -class AddHistoryEvent(BaseModel): - """Conversation history entry added event.""" - - event: str = Field(default="addHistory", description="Event type") - timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp") - sender: Optional[str] = Field(default=None, description="Component that added history") - speaker: str = Field(..., description="Speaker identifier") - text: str = Field(..., description="Conversation text") - - -class DTMFEvent(BaseEvent): - """DTMF tone detected event.""" - - event: str = Field(default="dtmf", description="Event type") - digit: str = Field(..., description="DTMF digit (0-9, *, #, A-D)") - - -class HeartBeatEvent(BaseModel): - """Server-to-client heartbeat to keep connection alive.""" - - event: str = Field(default="heartBeat", description="Event type") - timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp in milliseconds") - - -# Event type mapping -EVENT_TYPES = { - "incoming": IncomingEvent, - "answer": AnswerEvent, - "reject": RejectEvent, - "ringing": RingingEvent, - "hangup": HangupEvent, - "speaking": SpeakingEvent, - "silence": SilenceEvent, - "asrFinal": AsrFinalEvent, - "asrDelta": AsrDeltaEvent, - "eou": EouEvent, - "trackStart": TrackStartEvent, - "trackEnd": TrackEndEvent, - "interruption": InterruptionEvent, - "error": ErrorEvent, - "metrics": MetricsEvent, - "addHistory": AddHistoryEvent, - "dtmf": DTMFEvent, - "heartBeat": HeartBeatEvent, -} - - -def create_event(event_type: str, **kwargs) -> BaseModel: - """ - Create an event model. - - Args: - event_type: Type of event to create - **kwargs: Event fields - - Returns: - Event model instance - - Raises: - ValueError: If event type is unknown - """ - event_class = EVENT_TYPES.get(event_type) - - if not event_class: - raise ValueError(f"Unknown event type: {event_type}") - - return event_class(event=event_type, **kwargs) diff --git a/engine/protocol/__init__.py b/engine/protocol/__init__.py new file mode 100644 index 0000000..311e510 --- /dev/null +++ b/engine/protocol/__init__.py @@ -0,0 +1 @@ +"""Protocol package.""" diff --git a/engine/protocol/ws_v1/__init__.py b/engine/protocol/ws_v1/__init__.py new file mode 100644 index 0000000..6b76589 --- /dev/null +++ b/engine/protocol/ws_v1/__init__.py @@ -0,0 +1 @@ +"""WS v1 protocol package.""" diff --git a/engine/models/ws_v1.py b/engine/protocol/ws_v1/schema.py similarity index 100% rename from engine/models/ws_v1.py rename to engine/protocol/ws_v1/schema.py diff --git a/engine/providers/__init__.py b/engine/providers/__init__.py new file mode 100644 index 0000000..2209974 --- /dev/null +++ b/engine/providers/__init__.py @@ -0,0 +1 @@ +"""Providers package.""" diff --git a/engine/providers/asr/__init__.py b/engine/providers/asr/__init__.py new file mode 100644 index 0000000..2efe6a9 --- /dev/null +++ b/engine/providers/asr/__init__.py @@ -0,0 +1 @@ +"""ASR providers.""" diff --git a/engine/services/asr.py b/engine/providers/asr/buffered.py similarity index 98% rename from engine/services/asr.py rename to engine/providers/asr/buffered.py index 51ab584..ce1a248 100644 --- a/engine/services/asr.py +++ b/engine/providers/asr/buffered.py @@ -9,7 +9,7 @@ import json from typing import AsyncIterator, Optional from loguru import logger -from services.base import BaseASRService, ASRResult, ServiceState +from providers.common.base import BaseASRService, ASRResult, ServiceState # Try to import websockets for streaming ASR try: diff --git a/engine/services/openai_compatible_asr.py b/engine/providers/asr/openai_compatible.py similarity index 99% rename from engine/services/openai_compatible_asr.py rename to engine/providers/asr/openai_compatible.py index 182d7a0..1a2083b 100644 --- a/engine/services/openai_compatible_asr.py +++ b/engine/providers/asr/openai_compatible.py @@ -19,7 +19,7 @@ except ImportError: AIOHTTP_AVAILABLE = False logger.warning("aiohttp not available - OpenAICompatibleASRService will not work") -from services.base import BaseASRService, ASRResult, ServiceState +from providers.common.base import BaseASRService, ASRResult, ServiceState class OpenAICompatibleASRService(BaseASRService): diff --git a/engine/services/siliconflow_asr.py b/engine/providers/asr/siliconflow.py similarity index 75% rename from engine/services/siliconflow_asr.py rename to engine/providers/asr/siliconflow.py index 2cb95dc..d0aeb50 100644 --- a/engine/services/siliconflow_asr.py +++ b/engine/providers/asr/siliconflow.py @@ -1,6 +1,6 @@ """Backward-compatible imports for legacy siliconflow_asr module.""" -from services.openai_compatible_asr import OpenAICompatibleASRService +from providers.asr.openai_compatible import OpenAICompatibleASRService # Backward-compatible alias SiliconFlowASRService = OpenAICompatibleASRService diff --git a/engine/providers/common/__init__.py b/engine/providers/common/__init__.py new file mode 100644 index 0000000..8550c10 --- /dev/null +++ b/engine/providers/common/__init__.py @@ -0,0 +1 @@ +"""Common provider types.""" diff --git a/engine/services/base.py b/engine/providers/common/base.py similarity index 100% rename from engine/services/base.py rename to engine/providers/common/base.py diff --git a/engine/services/streaming_text.py b/engine/providers/common/streaming_text.py similarity index 100% rename from engine/services/streaming_text.py rename to engine/providers/common/streaming_text.py diff --git a/engine/providers/factory/__init__.py b/engine/providers/factory/__init__.py new file mode 100644 index 0000000..9be8bc5 --- /dev/null +++ b/engine/providers/factory/__init__.py @@ -0,0 +1 @@ +"""Provider factories.""" diff --git a/engine/app/service_factory.py b/engine/providers/factory/default.py similarity index 91% rename from engine/app/service_factory.py rename to engine/providers/factory/default.py index 6bdb64c..4294d3c 100644 --- a/engine/app/service_factory.py +++ b/engine/providers/factory/default.py @@ -6,7 +6,7 @@ from typing import Any from loguru import logger -from core.ports import ( +from runtime.ports import ( ASRPort, ASRServiceSpec, LLMPort, @@ -15,12 +15,12 @@ from core.ports import ( TTSPort, TTSServiceSpec, ) -from services.asr import BufferedASRService -from services.dashscope_tts import DashScopeTTSService -from services.llm import MockLLMService, OpenAILLMService -from services.openai_compatible_asr import OpenAICompatibleASRService -from services.openai_compatible_tts import OpenAICompatibleTTSService -from services.tts import MockTTSService +from providers.asr.buffered import BufferedASRService +from providers.tts.dashscope import DashScopeTTSService +from providers.llm.openai import MockLLMService, OpenAILLMService +from providers.asr.openai_compatible import OpenAICompatibleASRService +from providers.tts.openai_compatible import OpenAICompatibleTTSService +from providers.tts.mock import MockTTSService _OPENAI_COMPATIBLE_PROVIDERS = {"openai_compatible", "openai-compatible", "siliconflow"} _SUPPORTED_LLM_PROVIDERS = {"openai", *_OPENAI_COMPATIBLE_PROVIDERS} diff --git a/engine/providers/llm/__init__.py b/engine/providers/llm/__init__.py new file mode 100644 index 0000000..3258a10 --- /dev/null +++ b/engine/providers/llm/__init__.py @@ -0,0 +1 @@ +"""LLM providers.""" diff --git a/engine/services/llm.py b/engine/providers/llm/openai.py similarity index 98% rename from engine/services/llm.py rename to engine/providers/llm/openai.py index c4d539e..02735fe 100644 --- a/engine/services/llm.py +++ b/engine/providers/llm/openai.py @@ -10,8 +10,8 @@ import uuid from typing import AsyncIterator, Optional, List, Dict, Any, Callable, Awaitable from loguru import logger -from app.backend_adapters import build_backend_adapter_from_settings -from services.base import BaseLLMService, LLMMessage, LLMStreamEvent, ServiceState +from adapters.control_plane.backend import build_backend_adapter_from_settings +from providers.common.base import BaseLLMService, LLMMessage, LLMStreamEvent, ServiceState # Try to import openai try: diff --git a/engine/providers/realtime/__init__.py b/engine/providers/realtime/__init__.py new file mode 100644 index 0000000..0d4cb46 --- /dev/null +++ b/engine/providers/realtime/__init__.py @@ -0,0 +1 @@ +"""Realtime providers.""" diff --git a/engine/services/realtime.py b/engine/providers/realtime/service.py similarity index 100% rename from engine/services/realtime.py rename to engine/providers/realtime/service.py diff --git a/engine/providers/tts/__init__.py b/engine/providers/tts/__init__.py new file mode 100644 index 0000000..531ecfa --- /dev/null +++ b/engine/providers/tts/__init__.py @@ -0,0 +1 @@ +"""TTS providers.""" diff --git a/engine/services/dashscope_tts.py b/engine/providers/tts/dashscope.py similarity index 99% rename from engine/services/dashscope_tts.py rename to engine/providers/tts/dashscope.py index 1ddcbff..c0b3fdb 100644 --- a/engine/services/dashscope_tts.py +++ b/engine/providers/tts/dashscope.py @@ -12,7 +12,7 @@ from typing import Any, AsyncIterator, Dict, Optional, Tuple from loguru import logger -from services.base import BaseTTSService, ServiceState, TTSChunk +from providers.common.base import BaseTTSService, ServiceState, TTSChunk try: import dashscope diff --git a/engine/services/tts.py b/engine/providers/tts/mock.py similarity index 95% rename from engine/services/tts.py rename to engine/providers/tts/mock.py index 0ed629d..1d1e143 100644 --- a/engine/services/tts.py +++ b/engine/providers/tts/mock.py @@ -5,7 +5,7 @@ from typing import AsyncIterator from loguru import logger -from services.base import BaseTTSService, TTSChunk, ServiceState +from providers.common.base import BaseTTSService, TTSChunk, ServiceState class MockTTSService(BaseTTSService): diff --git a/engine/services/openai_compatible_tts.py b/engine/providers/tts/openai_compatible.py similarity index 98% rename from engine/services/openai_compatible_tts.py rename to engine/providers/tts/openai_compatible.py index 41e3e45..767ad12 100644 --- a/engine/services/openai_compatible_tts.py +++ b/engine/providers/tts/openai_compatible.py @@ -13,8 +13,8 @@ from typing import AsyncIterator, Optional from urllib.parse import urlparse, urlunparse from loguru import logger -from services.base import BaseTTSService, TTSChunk, ServiceState -from services.streaming_tts_adapter import StreamingTTSAdapter # backward-compatible re-export +from providers.common.base import BaseTTSService, TTSChunk, ServiceState +from providers.tts.streaming_adapter import StreamingTTSAdapter # backward-compatible re-export class OpenAICompatibleTTSService(BaseTTSService): diff --git a/engine/services/siliconflow_tts.py b/engine/providers/tts/siliconflow.py similarity index 72% rename from engine/services/siliconflow_tts.py rename to engine/providers/tts/siliconflow.py index 3cdf32a..3b894d9 100644 --- a/engine/services/siliconflow_tts.py +++ b/engine/providers/tts/siliconflow.py @@ -1,6 +1,6 @@ """Backward-compatible imports for legacy siliconflow_tts module.""" -from services.openai_compatible_tts import OpenAICompatibleTTSService, StreamingTTSAdapter +from providers.tts.openai_compatible import OpenAICompatibleTTSService, StreamingTTSAdapter # Backward-compatible alias SiliconFlowTTSService = OpenAICompatibleTTSService diff --git a/engine/services/streaming_tts_adapter.py b/engine/providers/tts/streaming_adapter.py similarity index 95% rename from engine/services/streaming_tts_adapter.py rename to engine/providers/tts/streaming_adapter.py index d4cb745..853e7ab 100644 --- a/engine/services/streaming_tts_adapter.py +++ b/engine/providers/tts/streaming_adapter.py @@ -4,8 +4,8 @@ import asyncio from loguru import logger -from services.base import BaseTTSService -from services.streaming_text import extract_tts_sentence, has_spoken_content +from providers.common.base import BaseTTSService +from providers.common.streaming_text import extract_tts_sentence, has_spoken_content class StreamingTTSAdapter: diff --git a/engine/pyproject.toml b/engine/pyproject.toml index 8786905..c0031f0 100644 --- a/engine/pyproject.toml +++ b/engine/pyproject.toml @@ -31,7 +31,17 @@ Issues = "https://github.com/yourusername/py-active-call-cc/issues" [tool.setuptools.packages.find] where = ["."] -include = ["app*"] +include = [ + "app*", + "adapters*", + "protocol*", + "providers*", + "processors*", + "runtime*", + "tools*", + "utils*", + "workflow*", +] exclude = ["tests*", "scripts*", "reference*"] [tool.black] diff --git a/engine/runtime/__init__.py b/engine/runtime/__init__.py new file mode 100644 index 0000000..1364082 --- /dev/null +++ b/engine/runtime/__init__.py @@ -0,0 +1 @@ +"""Runtime package.""" diff --git a/engine/core/conversation.py b/engine/runtime/conversation.py similarity index 99% rename from engine/core/conversation.py rename to engine/runtime/conversation.py index 08b23c6..fe21c01 100644 --- a/engine/core/conversation.py +++ b/engine/runtime/conversation.py @@ -10,7 +10,7 @@ from dataclasses import dataclass, field from enum import Enum from loguru import logger -from services.base import LLMMessage +from providers.common.base import LLMMessage class ConversationState(Enum): diff --git a/engine/core/events.py b/engine/runtime/events.py similarity index 100% rename from engine/core/events.py rename to engine/runtime/events.py diff --git a/engine/runtime/history/__init__.py b/engine/runtime/history/__init__.py new file mode 100644 index 0000000..44329ff --- /dev/null +++ b/engine/runtime/history/__init__.py @@ -0,0 +1 @@ +"""Runtime history package.""" diff --git a/engine/core/history_bridge.py b/engine/runtime/history/bridge.py similarity index 99% rename from engine/core/history_bridge.py rename to engine/runtime/history/bridge.py index 70a681b..bacd682 100644 --- a/engine/core/history_bridge.py +++ b/engine/runtime/history/bridge.py @@ -9,7 +9,7 @@ from typing import Optional from loguru import logger -from core.ports import ConversationHistoryStore +from runtime.ports import ConversationHistoryStore @dataclass diff --git a/engine/runtime/pipeline/__init__.py b/engine/runtime/pipeline/__init__.py new file mode 100644 index 0000000..8861a22 --- /dev/null +++ b/engine/runtime/pipeline/__init__.py @@ -0,0 +1 @@ +"""Runtime pipeline package.""" diff --git a/engine/runtime/pipeline/asr_flow.py b/engine/runtime/pipeline/asr_flow.py new file mode 100644 index 0000000..1b539f5 --- /dev/null +++ b/engine/runtime/pipeline/asr_flow.py @@ -0,0 +1,13 @@ +"""ASR flow helpers extracted from the duplex pipeline. + +This module is intentionally lightweight for phase-wise migration. +""" + +from __future__ import annotations + +from providers.common.base import ASRResult + + +def is_final_result(result: ASRResult) -> bool: + """Return whether an ASR result is final.""" + return bool(result.is_final) diff --git a/engine/runtime/pipeline/constants.py b/engine/runtime/pipeline/constants.py new file mode 100644 index 0000000..0109925 --- /dev/null +++ b/engine/runtime/pipeline/constants.py @@ -0,0 +1,6 @@ +"""Shared constants for the runtime duplex pipeline.""" + +TRACK_AUDIO_IN = "audio_in" +TRACK_AUDIO_OUT = "audio_out" +TRACK_CONTROL = "control" +PCM_FRAME_BYTES = 640 # 16k mono pcm_s16le, 20ms diff --git a/engine/core/duplex_pipeline.py b/engine/runtime/pipeline/duplex.py similarity index 99% rename from engine/core/duplex_pipeline.py rename to engine/runtime/pipeline/duplex.py index bbf3d47..aacd0c7 100644 --- a/engine/core/duplex_pipeline.py +++ b/engine/runtime/pipeline/duplex.py @@ -26,10 +26,10 @@ import aiohttp from loguru import logger from app.config import settings -from app.service_factory import DefaultRealtimeServiceFactory -from core.conversation import ConversationManager, ConversationState -from core.events import get_event_bus -from core.ports import ( +from providers.factory.default import DefaultRealtimeServiceFactory +from runtime.conversation import ConversationManager, ConversationState +from runtime.events import get_event_bus +from runtime.ports import ( ASRPort, ASRServiceSpec, LLMPort, @@ -38,13 +38,13 @@ from core.ports import ( TTSPort, TTSServiceSpec, ) -from core.tool_executor import execute_server_tool -from core.transports import BaseTransport -from models.ws_v1 import ev +from tools.executor import execute_server_tool +from runtime.transports import BaseTransport +from protocol.ws_v1.schema import ev from processors.eou import EouDetector from processors.vad import SileroVAD, VADProcessor -from services.base import LLMMessage, LLMStreamEvent -from services.streaming_text import extract_tts_sentence, has_spoken_content +from providers.common.base import LLMMessage, LLMStreamEvent +from providers.common.streaming_text import extract_tts_sentence, has_spoken_content class DuplexPipeline: diff --git a/engine/runtime/pipeline/events_out.py b/engine/runtime/pipeline/events_out.py new file mode 100644 index 0000000..dabbafb --- /dev/null +++ b/engine/runtime/pipeline/events_out.py @@ -0,0 +1,12 @@ +"""Output-event shaping helpers for the runtime pipeline.""" + +from __future__ import annotations + +from typing import Any, Dict + + +def assistant_text_delta_event(text: str, **extra: Any) -> Dict[str, Any]: + """Build a normalized assistant text delta payload.""" + payload: Dict[str, Any] = {"type": "assistant.text.delta", "text": str(text)} + payload.update(extra) + return payload diff --git a/engine/runtime/pipeline/interrupts.py b/engine/runtime/pipeline/interrupts.py new file mode 100644 index 0000000..1960d56 --- /dev/null +++ b/engine/runtime/pipeline/interrupts.py @@ -0,0 +1,8 @@ +"""Interruption-related helpers extracted from the duplex pipeline.""" + +from __future__ import annotations + + +def should_interrupt(min_duration_ms: int, detected_ms: int) -> bool: + """Decide whether interruption conditions are met.""" + return int(detected_ms) >= max(0, int(min_duration_ms)) diff --git a/engine/runtime/pipeline/llm_flow.py b/engine/runtime/pipeline/llm_flow.py new file mode 100644 index 0000000..d938fb0 --- /dev/null +++ b/engine/runtime/pipeline/llm_flow.py @@ -0,0 +1,13 @@ +"""LLM flow helpers extracted from the duplex pipeline. + +This module is intentionally lightweight for phase-wise migration. +""" + +from __future__ import annotations + +from providers.common.base import LLMStreamEvent + + +def is_done_event(event: LLMStreamEvent) -> bool: + """Return whether an LLM stream event signals completion.""" + return str(event.type) == "done" diff --git a/engine/runtime/pipeline/tooling.py b/engine/runtime/pipeline/tooling.py new file mode 100644 index 0000000..459dceb --- /dev/null +++ b/engine/runtime/pipeline/tooling.py @@ -0,0 +1,13 @@ +"""Tooling helpers extracted from the duplex pipeline.""" + +from __future__ import annotations + +from typing import Any + + +def normalize_tool_name(name: Any, aliases: dict[str, str]) -> str: + """Normalize tool name with alias mapping.""" + normalized = str(name or "").strip() + if not normalized: + return "" + return aliases.get(normalized, normalized) diff --git a/engine/runtime/pipeline/tts_flow.py b/engine/runtime/pipeline/tts_flow.py new file mode 100644 index 0000000..156547e --- /dev/null +++ b/engine/runtime/pipeline/tts_flow.py @@ -0,0 +1,15 @@ +"""TTS flow helpers extracted from the duplex pipeline. + +This module is intentionally lightweight for phase-wise migration. +""" + +from __future__ import annotations + +from providers.common.base import TTSChunk + + +def chunk_duration_ms(chunk: TTSChunk) -> float: + """Estimate chunk duration in milliseconds for pcm16 mono.""" + if chunk.sample_rate <= 0: + return 0.0 + return (len(chunk.audio) / 2.0 / float(chunk.sample_rate)) * 1000.0 diff --git a/engine/core/ports/__init__.py b/engine/runtime/ports/__init__.py similarity index 56% rename from engine/core/ports/__init__.py rename to engine/runtime/ports/__init__.py index 2ae96c4..a7cbce3 100644 --- a/engine/core/ports/__init__.py +++ b/engine/runtime/ports/__init__.py @@ -1,16 +1,16 @@ -"""Port interfaces for engine-side integration boundaries.""" +"""Port interfaces for runtime integration boundaries.""" -from core.ports.asr import ASRBufferControl, ASRInterimControl, ASRPort, ASRServiceSpec -from core.ports.control_plane import ( +from runtime.ports.asr import ASRBufferControl, ASRInterimControl, ASRPort, ASRServiceSpec +from runtime.ports.control_plane import ( AssistantRuntimeConfigProvider, ControlPlaneGateway, ConversationHistoryStore, KnowledgeRetriever, ToolCatalog, ) -from core.ports.llm import LLMCancellable, LLMPort, LLMRuntimeConfigurable, LLMServiceSpec -from core.ports.service_factory import RealtimeServiceFactory -from core.ports.tts import TTSPort, TTSServiceSpec +from runtime.ports.llm import LLMCancellable, LLMPort, LLMRuntimeConfigurable, LLMServiceSpec +from runtime.ports.service_factory import RealtimeServiceFactory +from runtime.ports.tts import TTSPort, TTSServiceSpec __all__ = [ "ASRPort", diff --git a/engine/core/ports/asr.py b/engine/runtime/ports/asr.py similarity index 97% rename from engine/core/ports/asr.py rename to engine/runtime/ports/asr.py index fa302cd..8621ed0 100644 --- a/engine/core/ports/asr.py +++ b/engine/runtime/ports/asr.py @@ -5,7 +5,7 @@ from __future__ import annotations from dataclasses import dataclass from typing import AsyncIterator, Awaitable, Callable, Optional, Protocol -from services.base import ASRResult +from providers.common.base import ASRResult TranscriptCallback = Callable[[str, bool], Awaitable[None]] diff --git a/engine/core/ports/control_plane.py b/engine/runtime/ports/control_plane.py similarity index 100% rename from engine/core/ports/control_plane.py rename to engine/runtime/ports/control_plane.py diff --git a/engine/core/ports/llm.py b/engine/runtime/ports/llm.py similarity index 96% rename from engine/core/ports/llm.py rename to engine/runtime/ports/llm.py index ca515ac..a591985 100644 --- a/engine/core/ports/llm.py +++ b/engine/runtime/ports/llm.py @@ -5,7 +5,7 @@ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, AsyncIterator, Awaitable, Callable, Dict, List, Optional, Protocol -from services.base import LLMMessage, LLMStreamEvent +from providers.common.base import LLMMessage, LLMStreamEvent KnowledgeRetrieverFn = Callable[..., Awaitable[List[Dict[str, Any]]]] diff --git a/engine/core/ports/service_factory.py b/engine/runtime/ports/service_factory.py similarity index 79% rename from engine/core/ports/service_factory.py rename to engine/runtime/ports/service_factory.py index d1d8476..7ce8b77 100644 --- a/engine/core/ports/service_factory.py +++ b/engine/runtime/ports/service_factory.py @@ -4,9 +4,9 @@ from __future__ import annotations from typing import Protocol -from core.ports.asr import ASRPort, ASRServiceSpec -from core.ports.llm import LLMPort, LLMServiceSpec -from core.ports.tts import TTSPort, TTSServiceSpec +from runtime.ports.asr import ASRPort, ASRServiceSpec +from runtime.ports.llm import LLMPort, LLMServiceSpec +from runtime.ports.tts import TTSPort, TTSServiceSpec class RealtimeServiceFactory(Protocol): diff --git a/engine/core/ports/tts.py b/engine/runtime/ports/tts.py similarity index 96% rename from engine/core/ports/tts.py rename to engine/runtime/ports/tts.py index 0693cdb..523dc3c 100644 --- a/engine/core/ports/tts.py +++ b/engine/runtime/ports/tts.py @@ -5,7 +5,7 @@ from __future__ import annotations from dataclasses import dataclass from typing import AsyncIterator, Optional, Protocol -from services.base import TTSChunk +from providers.common.base import TTSChunk @dataclass(frozen=True) diff --git a/engine/runtime/session/__init__.py b/engine/runtime/session/__init__.py new file mode 100644 index 0000000..d224fb9 --- /dev/null +++ b/engine/runtime/session/__init__.py @@ -0,0 +1 @@ +"""Runtime session package.""" diff --git a/engine/runtime/session/lifecycle.py b/engine/runtime/session/lifecycle.py new file mode 100644 index 0000000..9fd8ebf --- /dev/null +++ b/engine/runtime/session/lifecycle.py @@ -0,0 +1,10 @@ +"""Lifecycle helper utilities for runtime sessions.""" + +from __future__ import annotations + +from datetime import datetime, timezone + + +def utc_now_iso() -> str: + """Return current UTC timestamp in ISO 8601 format.""" + return datetime.now(timezone.utc).isoformat() diff --git a/engine/core/session.py b/engine/runtime/session/manager.py similarity index 98% rename from engine/core/session.py rename to engine/runtime/session/manager.py index eff77a6..1f65ea7 100644 --- a/engine/core/session.py +++ b/engine/runtime/session/manager.py @@ -9,22 +9,22 @@ from enum import Enum from typing import Optional, Dict, Any, List from loguru import logger -from app.backend_adapters import build_backend_adapter_from_settings -from core.transports import BaseTransport -from core.ports import ( +from adapters.control_plane.backend import build_backend_adapter_from_settings +from runtime.transports import BaseTransport +from runtime.ports import ( AssistantRuntimeConfigProvider, ControlPlaneGateway, ConversationHistoryStore, KnowledgeRetriever, ToolCatalog, ) -from core.duplex_pipeline import DuplexPipeline -from core.conversation import ConversationTurn -from core.history_bridge import SessionHistoryBridge -from core.workflow_runner import WorkflowRunner, WorkflowTransition, WorkflowNodeDef, WorkflowEdgeDef +from runtime.pipeline.duplex import DuplexPipeline +from runtime.conversation import ConversationTurn +from runtime.history.bridge import SessionHistoryBridge +from workflow.runner import WorkflowRunner, WorkflowTransition, WorkflowNodeDef, WorkflowEdgeDef from app.config import settings -from services.base import LLMMessage -from models.ws_v1 import ( +from providers.common.base import LLMMessage +from protocol.ws_v1.schema import ( parse_client_message, ev, SessionStartMessage, diff --git a/engine/runtime/session/metadata.py b/engine/runtime/session/metadata.py new file mode 100644 index 0000000..ab32971 --- /dev/null +++ b/engine/runtime/session/metadata.py @@ -0,0 +1,9 @@ +"""Metadata helpers extracted from session manager.""" + +from __future__ import annotations + +import re +from typing import Pattern + +DYNAMIC_VARIABLE_KEY_RE: Pattern[str] = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") +DYNAMIC_VARIABLE_PLACEHOLDER_RE: Pattern[str] = re.compile(r"\{\{\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\}\}") diff --git a/engine/runtime/session/workflow_bridge.py b/engine/runtime/session/workflow_bridge.py new file mode 100644 index 0000000..e03d939 --- /dev/null +++ b/engine/runtime/session/workflow_bridge.py @@ -0,0 +1,12 @@ +"""Workflow bridge helpers for runtime session orchestration.""" + +from __future__ import annotations + +from typing import Optional + +from workflow.runner import WorkflowRunner + + +def has_active_workflow(workflow_runner: Optional[WorkflowRunner]) -> bool: + """Return whether a workflow runner exists and has a current node.""" + return bool(workflow_runner and workflow_runner.current_node is not None) diff --git a/engine/core/transports.py b/engine/runtime/transports.py similarity index 100% rename from engine/core/transports.py rename to engine/runtime/transports.py diff --git a/engine/services/__init__.py b/engine/services/__init__.py deleted file mode 100644 index f64ef05..0000000 --- a/engine/services/__init__.py +++ /dev/null @@ -1,52 +0,0 @@ -"""AI Services package. - -Provides ASR, LLM, TTS, and Realtime API services for voice conversation. -""" - -from services.base import ( - ServiceState, - ASRResult, - LLMMessage, - TTSChunk, - BaseASRService, - BaseLLMService, - BaseTTSService, -) -from services.llm import OpenAILLMService, MockLLMService -from services.dashscope_tts import DashScopeTTSService -from services.tts import MockTTSService -from services.asr import BufferedASRService, MockASRService -from services.openai_compatible_asr import OpenAICompatibleASRService, SiliconFlowASRService -from services.openai_compatible_tts import OpenAICompatibleTTSService, SiliconFlowTTSService -from services.streaming_tts_adapter import StreamingTTSAdapter -from services.realtime import RealtimeService, RealtimeConfig, RealtimePipeline - -__all__ = [ - # Base classes - "ServiceState", - "ASRResult", - "LLMMessage", - "TTSChunk", - "BaseASRService", - "BaseLLMService", - "BaseTTSService", - # LLM - "OpenAILLMService", - "MockLLMService", - # TTS - "DashScopeTTSService", - "MockTTSService", - # ASR - "BufferedASRService", - "MockASRService", - "OpenAICompatibleASRService", - "SiliconFlowASRService", - # TTS (SiliconFlow) - "OpenAICompatibleTTSService", - "SiliconFlowTTSService", - "StreamingTTSAdapter", - # Realtime - "RealtimeService", - "RealtimeConfig", - "RealtimePipeline", -] diff --git a/engine/tests/test_backend_adapters.py b/engine/tests/test_backend_adapters.py index 347df45..70f569e 100644 --- a/engine/tests/test_backend_adapters.py +++ b/engine/tests/test_backend_adapters.py @@ -1,7 +1,7 @@ import aiohttp import pytest -from app.backend_adapters import ( +from adapters.control_plane.backend import ( AssistantConfigSourceAdapter, LocalYamlAssistantConfigAdapter, build_backend_adapter, @@ -120,7 +120,7 @@ async def test_http_backend_adapter_create_call_record_posts_expected_payload(mo }, ) - monkeypatch.setattr("app.backend_adapters.aiohttp.ClientSession", _FakeClientSession) + monkeypatch.setattr("adapters.control_plane.backend.aiohttp.ClientSession", _FakeClientSession) config_dir = tmp_path / "assistants" config_dir.mkdir(parents=True, exist_ok=True) @@ -198,7 +198,7 @@ async def test_with_backend_url_uses_backend_for_assistant_config(monkeypatch, t _ = (url, json) return _FakeResponse(status=200, payload={"id": "call_1"}) - monkeypatch.setattr("app.backend_adapters.aiohttp.ClientSession", _FakeClientSession) + monkeypatch.setattr("adapters.control_plane.backend.aiohttp.ClientSession", _FakeClientSession) config_dir = tmp_path / "assistants" config_dir.mkdir(parents=True, exist_ok=True) @@ -234,7 +234,7 @@ async def test_backend_mode_disabled_uses_local_assistant_config_even_with_url(m _ = timeout raise AssertionError("HTTP client should not be created when backend_mode=disabled") - monkeypatch.setattr("app.backend_adapters.aiohttp.ClientSession", _FailIfCalledClientSession) + monkeypatch.setattr("adapters.control_plane.backend.aiohttp.ClientSession", _FailIfCalledClientSession) config_dir = tmp_path / "assistants" config_dir.mkdir(parents=True, exist_ok=True) diff --git a/engine/tests/test_dynamic_variables.py b/engine/tests/test_dynamic_variables.py index f7982c7..d6744af 100644 --- a/engine/tests/test_dynamic_variables.py +++ b/engine/tests/test_dynamic_variables.py @@ -1,4 +1,4 @@ -from core.session import Session +from runtime.session.manager import Session def _session() -> Session: diff --git a/engine/tests/test_history_bridge.py b/engine/tests/test_history_bridge.py index 2f9dd80..d70fa6e 100644 --- a/engine/tests/test_history_bridge.py +++ b/engine/tests/test_history_bridge.py @@ -3,7 +3,7 @@ import time import pytest -from core.history_bridge import SessionHistoryBridge +from runtime.history.bridge import SessionHistoryBridge class _FakeHistoryWriter: diff --git a/engine/tests/test_tool_call_flow.py b/engine/tests/test_tool_call_flow.py index 11a7b77..d820643 100644 --- a/engine/tests/test_tool_call_flow.py +++ b/engine/tests/test_tool_call_flow.py @@ -4,10 +4,10 @@ from typing import Any, Dict, List import pytest -from core.conversation import ConversationState -from core.duplex_pipeline import DuplexPipeline -from models.ws_v1 import OutputAudioPlayedMessage, ToolCallResultsMessage, parse_client_message -from services.base import LLMStreamEvent +from runtime.conversation import ConversationState +from runtime.pipeline.duplex import DuplexPipeline +from protocol.ws_v1.schema import OutputAudioPlayedMessage, ToolCallResultsMessage, parse_client_message +from providers.common.base import LLMStreamEvent class _DummySileroVAD: @@ -86,9 +86,9 @@ class _CaptureGenerateLLM: def _build_pipeline(monkeypatch, llm_rounds: List[List[LLMStreamEvent]]) -> tuple[DuplexPipeline, List[Dict[str, Any]]]: - monkeypatch.setattr("core.duplex_pipeline.SileroVAD", _DummySileroVAD) - monkeypatch.setattr("core.duplex_pipeline.VADProcessor", _DummyVADProcessor) - monkeypatch.setattr("core.duplex_pipeline.EouDetector", _DummyEouDetector) + monkeypatch.setattr("runtime.pipeline.duplex.SileroVAD", _DummySileroVAD) + monkeypatch.setattr("runtime.pipeline.duplex.VADProcessor", _DummyVADProcessor) + monkeypatch.setattr("runtime.pipeline.duplex.EouDetector", _DummyEouDetector) pipeline = DuplexPipeline( transport=_FakeTransport(), @@ -112,7 +112,7 @@ def _build_pipeline(monkeypatch, llm_rounds: List[List[LLMStreamEvent]]) -> tupl def test_pipeline_uses_default_tools_from_settings(monkeypatch): monkeypatch.setattr( - "core.duplex_pipeline.settings.tools", + "runtime.pipeline.duplex.settings.tools", [ "current_time", "calculator", @@ -141,7 +141,7 @@ def test_pipeline_uses_default_tools_from_settings(monkeypatch): def test_pipeline_exposes_unknown_string_tools_with_fallback_schema(monkeypatch): - monkeypatch.setattr("core.duplex_pipeline.settings.tools", ["custom_system_cmd"]) + monkeypatch.setattr("runtime.pipeline.duplex.settings.tools", ["custom_system_cmd"]) pipeline, _events = _build_pipeline(monkeypatch, [[LLMStreamEvent(type="done")]]) schemas = pipeline._resolved_tool_schemas() @@ -151,7 +151,7 @@ def test_pipeline_exposes_unknown_string_tools_with_fallback_schema(monkeypatch) def test_pipeline_assigns_default_client_executor_for_system_string_tools(monkeypatch): - monkeypatch.setattr("core.duplex_pipeline.settings.tools", ["increase_volume"]) + monkeypatch.setattr("runtime.pipeline.duplex.settings.tools", ["increase_volume"]) pipeline, _events = _build_pipeline(monkeypatch, [[LLMStreamEvent(type="done")]]) tool_call = { @@ -221,9 +221,9 @@ async def test_pipeline_applies_default_args_to_tool_call(monkeypatch): @pytest.mark.asyncio async def test_generated_opener_prompt_uses_system_prompt_only(monkeypatch): - monkeypatch.setattr("core.duplex_pipeline.SileroVAD", _DummySileroVAD) - monkeypatch.setattr("core.duplex_pipeline.VADProcessor", _DummyVADProcessor) - monkeypatch.setattr("core.duplex_pipeline.EouDetector", _DummyEouDetector) + monkeypatch.setattr("runtime.pipeline.duplex.SileroVAD", _DummySileroVAD) + monkeypatch.setattr("runtime.pipeline.duplex.VADProcessor", _DummyVADProcessor) + monkeypatch.setattr("runtime.pipeline.duplex.EouDetector", _DummyEouDetector) llm = _CaptureGenerateLLM("你好") pipeline = DuplexPipeline( @@ -662,7 +662,7 @@ async def test_server_tool_timeout_emits_504_and_continues(monkeypatch): "status": {"code": 200, "message": "ok"}, } - monkeypatch.setattr("core.duplex_pipeline.execute_server_tool", _slow_execute) + monkeypatch.setattr("runtime.pipeline.duplex.execute_server_tool", _slow_execute) pipeline, events = _build_pipeline( monkeypatch, diff --git a/engine/tests/test_tool_executor.py b/engine/tests/test_tool_executor.py index 17345c7..aada0c1 100644 --- a/engine/tests/test_tool_executor.py +++ b/engine/tests/test_tool_executor.py @@ -1,6 +1,6 @@ import pytest -from core.tool_executor import execute_server_tool +from tools.executor import execute_server_tool @pytest.mark.asyncio @@ -38,7 +38,7 @@ async def test_current_time_uses_local_system_clock(monkeypatch): async def _should_not_be_called(_tool_id): raise AssertionError("fetch_tool_resource should not be called for current_time") - monkeypatch.setattr("core.tool_executor.fetch_tool_resource", _should_not_be_called) + monkeypatch.setattr("tools.executor.fetch_tool_resource", _should_not_be_called) result = await execute_server_tool( { diff --git a/engine/tests/test_ws_protocol_session_start.py b/engine/tests/test_ws_protocol_session_start.py index ad68674..e055c75 100644 --- a/engine/tests/test_ws_protocol_session_start.py +++ b/engine/tests/test_ws_protocol_session_start.py @@ -1,7 +1,7 @@ import pytest -from core.session import Session, WsSessionState -from models.ws_v1 import OutputAudioPlayedMessage, SessionStartMessage, parse_client_message +from runtime.session.manager import Session, WsSessionState +from protocol.ws_v1.schema import OutputAudioPlayedMessage, SessionStartMessage, parse_client_message def _session() -> Session: @@ -194,7 +194,7 @@ async def test_handle_session_start_requires_assistant_id_and_closes_transport() @pytest.mark.asyncio async def test_handle_session_start_applies_whitelisted_overrides_and_ignores_workflow(monkeypatch): - monkeypatch.setattr("core.session.settings.ws_emit_config_resolved", False) + monkeypatch.setattr("runtime.session.manager.settings.ws_emit_config_resolved", False) session = Session.__new__(Session) session.id = "sess_start_ok" @@ -289,9 +289,9 @@ async def test_handle_session_start_applies_whitelisted_overrides_and_ignores_wo @pytest.mark.asyncio async def test_handle_session_start_emits_config_resolved_when_enabled(monkeypatch): - monkeypatch.setattr("core.session.settings.ws_emit_config_resolved", True) - monkeypatch.setattr("core.session.settings.ws_protocol_version", "v1-custom") - monkeypatch.setattr("core.session.settings.default_codec", "pcmu") + monkeypatch.setattr("runtime.session.manager.settings.ws_emit_config_resolved", True) + monkeypatch.setattr("runtime.session.manager.settings.ws_protocol_version", "v1-custom") + monkeypatch.setattr("runtime.session.manager.settings.default_codec", "pcmu") session = Session.__new__(Session) session.id = "sess_start_emit_config" @@ -385,8 +385,8 @@ async def test_handle_session_start_emits_config_resolved_when_enabled(monkeypat @pytest.mark.asyncio async def test_handle_audio_uses_chunk_size_for_frame_validation(monkeypatch): - monkeypatch.setattr("core.session.settings.sample_rate", 16000) - monkeypatch.setattr("core.session.settings.chunk_size_ms", 10) + monkeypatch.setattr("runtime.session.manager.settings.sample_rate", 16000) + monkeypatch.setattr("runtime.session.manager.settings.chunk_size_ms", 10) session = Session.__new__(Session) session.id = "sess_chunk_frame" diff --git a/engine/tools/__init__.py b/engine/tools/__init__.py new file mode 100644 index 0000000..29a67f0 --- /dev/null +++ b/engine/tools/__init__.py @@ -0,0 +1 @@ +"""Tools package.""" diff --git a/engine/core/tool_executor.py b/engine/tools/executor.py similarity index 99% rename from engine/core/tool_executor.py rename to engine/tools/executor.py index 899d930..0049cbc 100644 --- a/engine/core/tool_executor.py +++ b/engine/tools/executor.py @@ -8,7 +8,7 @@ from typing import Any, Awaitable, Callable, Dict, Optional import aiohttp -from app.backend_adapters import build_backend_adapter_from_settings +from adapters.control_plane.backend import build_backend_adapter_from_settings ToolResourceFetcher = Callable[[str], Awaitable[Optional[Dict[str, Any]]]] diff --git a/engine/workflow/__init__.py b/engine/workflow/__init__.py new file mode 100644 index 0000000..35ffe2e --- /dev/null +++ b/engine/workflow/__init__.py @@ -0,0 +1 @@ +"""Workflow package.""" diff --git a/engine/core/workflow_runner.py b/engine/workflow/runner.py similarity index 100% rename from engine/core/workflow_runner.py rename to engine/workflow/runner.py