Compare commits

...

2 Commits

Author SHA1 Message Date
Xin Wang
7e0b777923 Refactor project structure and enhance backend integration
- Expanded package inclusion in `pyproject.toml` to support new modules.
- Introduced new `adapters` and `protocol` packages for better organization.
- Added backend adapter implementations for control plane integration.
- Updated main application imports to reflect new package structure.
- Removed deprecated core components and adjusted documentation accordingly.
- Enhanced architecture documentation to clarify the new runtime and integration layers.
2026-03-06 09:51:56 +08:00
Xin Wang
4e2450e800 Refactor backend integration and service architecture
- Removed the backend client compatibility wrapper and associated methods to streamline backend integration.
- Updated session management to utilize control plane gateways and runtime configuration providers.
- Adjusted TTS service implementations to remove the EdgeTTS service and simplify service dependencies.
- Enhanced documentation to reflect changes in backend integration and service architecture.
- Updated configuration files to remove deprecated TTS provider options and clarify available settings.
2026-03-06 09:00:43 +08:00
82 changed files with 896 additions and 1130 deletions

View File

@@ -0,0 +1 @@
"""Adapters package."""

View File

@@ -0,0 +1 @@
"""Control-plane adapters package."""

View File

@@ -1,87 +0,0 @@
"""Compatibility wrappers around backend adapter implementations."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from app.backend_adapters import build_backend_adapter_from_settings
def _adapter():
return build_backend_adapter_from_settings()
async def fetch_assistant_config(assistant_id: str) -> Optional[Dict[str, Any]]:
"""Fetch assistant config payload from backend adapter."""
return await _adapter().fetch_assistant_config(assistant_id)
async def create_history_call_record(
*,
user_id: int,
assistant_id: Optional[str],
source: str = "debug",
) -> Optional[str]:
"""Create a call record via backend history API and return call_id."""
return await _adapter().create_call_record(
user_id=user_id,
assistant_id=assistant_id,
source=source,
)
async def add_history_transcript(
*,
call_id: str,
turn_index: int,
speaker: str,
content: str,
start_ms: int,
end_ms: int,
confidence: Optional[float] = None,
duration_ms: Optional[int] = None,
) -> bool:
"""Append a transcript segment to backend history."""
return await _adapter().add_transcript(
call_id=call_id,
turn_index=turn_index,
speaker=speaker,
content=content,
start_ms=start_ms,
end_ms=end_ms,
confidence=confidence,
duration_ms=duration_ms,
)
async def finalize_history_call_record(
*,
call_id: str,
status: str,
duration_seconds: int,
) -> bool:
"""Finalize a call record with status and duration."""
return await _adapter().finalize_call_record(
call_id=call_id,
status=status,
duration_seconds=duration_seconds,
)
async def search_knowledge_context(
*,
kb_id: str,
query: str,
n_results: int = 5,
) -> List[Dict[str, Any]]:
"""Search backend knowledge base and return retrieval results."""
return await _adapter().search_knowledge_context(
kb_id=kb_id,
query=query,
n_results=n_results,
)
async def fetch_tool_resource(tool_id: str) -> Optional[Dict[str, Any]]:
"""Fetch tool resource configuration from backend API."""
return await _adapter().fetch_tool_resource(tool_id)

View File

@@ -71,7 +71,7 @@ class Settings(BaseSettings):
# TTS Configuration
tts_provider: str = Field(
default="openai_compatible",
description="TTS provider (edge, openai_compatible, siliconflow, dashscope)"
description="TTS provider (openai_compatible, siliconflow, dashscope)"
)
tts_api_url: Optional[str] = Field(default=None, description="TTS provider API URL")
tts_model: Optional[str] = Field(default=None, description="TTS model name")

View File

@@ -20,11 +20,11 @@ except ImportError:
logger.warning("aiortc not available - WebRTC endpoint will be disabled")
from app.config import settings
from app.backend_adapters import build_backend_adapter_from_settings
from core.transports import SocketTransport, WebRtcTransport, BaseTransport
from core.session import Session
from adapters.control_plane.backend import build_backend_adapter_from_settings
from runtime.transports import SocketTransport, WebRtcTransport, BaseTransport
from runtime.session.manager import Session
from processors.tracks import Resampled16kTrack
from core.events import get_event_bus, reset_event_bus
from runtime.events import get_event_bus, reset_event_bus
# Check interval for heartbeat/timeout (seconds)
_HEARTBEAT_CHECK_INTERVAL_SEC = 5
@@ -76,7 +76,7 @@ app.add_middleware(
# Active sessions storage
active_sessions: Dict[str, Session] = {}
backend_gateway = build_backend_adapter_from_settings()
control_plane_gateway = build_backend_adapter_from_settings()
# Configure logging
logger.remove()
@@ -187,7 +187,7 @@ async def websocket_endpoint(websocket: WebSocket):
session = Session(
session_id,
transport,
backend_gateway=backend_gateway,
control_plane_gateway=control_plane_gateway,
assistant_id=assistant_id,
)
active_sessions[session_id] = session
@@ -272,7 +272,7 @@ async def webrtc_endpoint(websocket: WebSocket):
session = Session(
session_id,
transport,
backend_gateway=backend_gateway,
control_plane_gateway=control_plane_gateway,
assistant_id=assistant_id,
)
active_sessions[session_id] = session

View File

@@ -21,7 +21,7 @@ agent:
api_url: https://api.qnaigc.com/v1
tts:
# provider: edge | openai_compatible | siliconflow | dashscope
# provider: openai_compatible | siliconflow | dashscope
# dashscope defaults (if omitted):
# api_url: wss://dashscope.aliyuncs.com/api-ws/v1/realtime
# model: qwen3-tts-flash-realtime

View File

@@ -18,7 +18,7 @@ agent:
api_url: https://api.qnaigc.com/v1
tts:
# provider: edge | openai_compatible | siliconflow | dashscope
# provider: openai_compatible | siliconflow | dashscope
# dashscope defaults (if omitted):
# api_url: wss://dashscope.aliyuncs.com/api-ws/v1/realtime
# model: qwen3-tts-flash-realtime

View File

@@ -1,20 +0,0 @@
"""Core Components Package"""
from core.events import EventBus, get_event_bus
from core.transports import BaseTransport, SocketTransport, WebRtcTransport
from core.session import Session
from core.conversation import ConversationManager, ConversationState, ConversationTurn
from core.duplex_pipeline import DuplexPipeline
__all__ = [
"EventBus",
"get_event_bus",
"BaseTransport",
"SocketTransport",
"WebRtcTransport",
"Session",
"ConversationManager",
"ConversationState",
"ConversationTurn",
"DuplexPipeline",
]

View File

@@ -1,17 +0,0 @@
"""Port interfaces for engine-side integration boundaries."""
from core.ports.backend import (
AssistantConfigProvider,
BackendGateway,
HistoryWriter,
KnowledgeSearcher,
ToolResourceResolver,
)
__all__ = [
"AssistantConfigProvider",
"BackendGateway",
"HistoryWriter",
"KnowledgeSearcher",
"ToolResourceResolver",
]

View File

@@ -27,16 +27,15 @@ Assistant config source behavior:
## Architecture
- Ports: `core/ports/backend.py`
- Adapters: `app/backend_adapters.py`
- Compatibility wrappers: `app/backend_client.py`
- Ports: `runtime/ports/control_plane.py`
- Adapters: `adapters/control_plane/backend.py`
`Session` and `DuplexPipeline` receive backend capabilities via injected adapter
methods instead of hard-coding backend client imports.
## Async History Writes
Session history persistence is handled by `core/history_bridge.py`.
Session history persistence is handled by `runtime/history/bridge.py`.
Design:

View File

@@ -0,0 +1,47 @@
# Engine Extension Ports (Draft)
This document defines the draft port set used to keep core runtime extensible.
## Port Modules
- `runtime/ports/control_plane.py`
- `AssistantRuntimeConfigProvider`
- `ConversationHistoryStore`
- `KnowledgeRetriever`
- `ToolCatalog`
- `ControlPlaneGateway`
- `runtime/ports/llm.py`
- `LLMServiceSpec`
- `LLMPort`
- optional extensions: `LLMCancellable`, `LLMRuntimeConfigurable`
- `runtime/ports/tts.py`
- `TTSServiceSpec`
- `TTSPort`
- `runtime/ports/asr.py`
- `ASRServiceSpec`
- `ASRPort`
- optional extensions: `ASRInterimControl`, `ASRBufferControl`
- `runtime/ports/service_factory.py`
- `RealtimeServiceFactory`
## Adapter Layer
- `providers/factory/default.py` provides `DefaultRealtimeServiceFactory`.
- It maps resolved provider specs to concrete adapters.
- Runtime orchestration (`runtime/pipeline/duplex.py`) depends on the factory port/specs, not concrete provider classes.
## Provider Behavior (Current)
- LLM:
- supported providers: `openai`, `openai_compatible`, `openai-compatible`, `siliconflow`
- fallback: `MockLLMService`
- TTS:
- supported providers: `dashscope`, `openai_compatible`, `openai-compatible`, `siliconflow`
- fallback: `MockTTSService`
- ASR:
- supported providers: `openai_compatible`, `openai-compatible`, `siliconflow`
- fallback: `BufferedASRService`
## Notes
- This is a draft contract set; follow-up work can add explicit capability negotiation and contract-version fields.

View File

@@ -0,0 +1,129 @@
# Engine High-Level Architecture
This document describes the runtime architecture of `engine` for realtime voice/text assistant interactions.
## Goals
- Low-latency duplex interaction (user speaks while assistant can respond)
- Clear separation between transport, orchestration, and model/service integrations
- Backend-optional runtime (works with or without external backend)
- Protocol-first interoperability through strict WS v1 control messages
## Top-Level Components
```mermaid
flowchart LR
C[Client\nWeb / Mobile / Device] <-- WS v1 + PCM --> A[FastAPI App\napp/main.py]
A --> S[Session\nruntime/session/manager.py]
S --> D[Duplex Pipeline\nruntime/pipeline/duplex.py]
D --> P[Processors\nVAD / EOU / Tracks]
D --> R[Workflow Runner\nworkflow/runner.py]
D --> E[Event Bus + Models\nruntime/events.py + protocol/ws_v1/*]
R --> SV[Service Layer\nproviders/asr/*\nproviders/llm/*\nproviders/tts/*]
R --> TE[Tool Executor\ntools/executor.py]
S --> HB[History Bridge\nruntime/history/bridge.py]
S --> BA[Control Plane Port\nruntime/ports/control_plane.py]
BA --> AD[Adapters\nadapters/control_plane/backend.py]
AD --> B[(External Backend API\noptional)]
SV --> M[(ASR/LLM/TTS Providers)]
```
## Request Lifecycle (Simplified)
1. Client connects to `/ws?assistant_id=<id>` and sends `session.start`.
2. App creates a `Session` with resolved assistant config (backend or local YAML).
3. Binary PCM frames enter the duplex pipeline.
4. `VAD`/`EOU` processors detect speech segments and trigger ASR finalization.
5. ASR text is routed into workflow + LLM generation.
6. Optional tool calls are executed (server-side or client-side result return).
7. LLM output streams as text deltas; TTS produces audio chunks for playback.
8. Session emits structured events (`transcript.*`, `assistant.*`, `output.audio.*`, `error`).
9. History bridge persists conversation data asynchronously.
10. On `session.stop` (or disconnect), session finalizes and drains pending writes.
## Layering and Responsibilities
### 1) Transport / API Layer
- Entry point: `app/main.py`
- Responsibilities:
- WebSocket lifecycle management
- WS v1 message validation and order guarantees
- Session creation and teardown
- Converting raw WS frames into internal events
### 2) Session + Orchestration Layer
- Core: `runtime/session/manager.py`, `runtime/pipeline/duplex.py`, `runtime/conversation.py`
- Responsibilities:
- Per-session state machine
- Turn boundaries and interruption/cancel handling
- Event sequencing (`seq`) and envelope consistency
- Bridging input/output tracks (`audio_in`, `audio_out`, `control`)
### 3) Processing Layer
- Modules: `processors/vad.py`, `processors/eou.py`, `processors/tracks.py`
- Responsibilities:
- Speech activity detection
- End-of-utterance decisioning
- Track-oriented routing and timing-sensitive pre/post processing
### 4) Workflow + Tooling Layer
- Modules: `workflow/runner.py`, `tools/executor.py`
- Responsibilities:
- Assistant workflow execution
- Tool call planning/execution and timeout handling
- Tool result normalization into protocol events
### 5) Service Integration Layer
- Modules: `providers/*`
- Responsibilities:
- Abstracting ASR/LLM/TTS provider differences
- Streaming token/audio adaptation
- Provider-specific adapters (OpenAI-compatible, DashScope, SiliconFlow, etc.)
### 6) Backend Integration Layer (Optional)
- Port: `runtime/ports/control_plane.py`
- Adapters: `adapters/control_plane/backend.py`
- Responsibilities:
- Fetching assistant runtime config
- Persisting call/session metadata and history
- Supporting `BACKEND_MODE=auto|http|disabled`
### 7) Persistence / Reliability Layer
- Module: `runtime/history/bridge.py`
- Responsibilities:
- Non-blocking queue-based history writes
- Retry with backoff on backend failures
- Best-effort drain on session finalize
## Key Design Principles
- Dependency inversion for backend: session/pipeline depend on port interfaces, not concrete clients.
- Streaming-first: text/audio are emitted incrementally to minimize perceived latency.
- Fail-soft behavior: backend/history failures should not block realtime interaction paths.
- Protocol strictness: WS v1 rejects malformed/out-of-order control traffic early.
- Explicit event model: all client-observable state changes are represented as typed events.
## Configuration Boundaries
- Runtime environment settings live in `app/config.py`.
- Assistant-specific behavior is loaded by `assistant_id`:
- backend mode: from backend API
- engine-only mode: local `engine/config/agents/<assistant_id>.yaml`
- Client-provided `metadata.overrides` and `dynamicVariables` can alter runtime behavior within protocol constraints.
## Related Docs
- WS protocol: `engine/docs/ws_v1_schema.md`
- Backend integration details: `engine/docs/backend_integration.md`
- Duplex interaction diagram: `engine/docs/duplex_interaction.svg`

View File

@@ -0,0 +1,21 @@
# Canonical Module Layout
This MVP uses a single canonical module layout without legacy import shims.
## Runtime and protocol
- `protocol.ws_v1.schema`
- `runtime.session.manager`
- `runtime.pipeline.duplex`
- `runtime.history.bridge`
- `runtime.events`
- `runtime.transports`
- `runtime.conversation`
- `runtime.ports.*`
## Integrations and orchestration
- `providers.*`
- `adapters.control_plane.backend`
- `workflow.runner`
- `tools.executor`

View File

@@ -7,9 +7,9 @@
- 握手顺序、状态机、错误语义与实现细节。
实现对照来源:
- `models/ws_v1.py`
- `core/session.py`
- `core/duplex_pipeline.py`
- `protocol/ws_v1/schema.py`
- `runtime/session/manager.py`
- `runtime/pipeline/duplex.py`
- `app/main.py`
---

View File

@@ -1 +0,0 @@
"""Data Models Package"""

View File

@@ -1,143 +0,0 @@
"""Protocol command models matching the original active-call API."""
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
class InviteCommand(BaseModel):
"""Invite command to initiate a call."""
command: str = Field(default="invite", description="Command type")
option: Optional[Dict[str, Any]] = Field(default=None, description="Call configuration options")
class AcceptCommand(BaseModel):
"""Accept command to accept an incoming call."""
command: str = Field(default="accept", description="Command type")
option: Optional[Dict[str, Any]] = Field(default=None, description="Call configuration options")
class RejectCommand(BaseModel):
"""Reject command to reject an incoming call."""
command: str = Field(default="reject", description="Command type")
reason: str = Field(default="", description="Reason for rejection")
code: Optional[int] = Field(default=None, description="SIP response code")
class RingingCommand(BaseModel):
"""Ringing command to send ringing response."""
command: str = Field(default="ringing", description="Command type")
recorder: Optional[Dict[str, Any]] = Field(default=None, description="Call recording configuration")
early_media: bool = Field(default=False, description="Enable early media")
ringtone: Optional[str] = Field(default=None, description="Custom ringtone URL")
class TTSCommand(BaseModel):
"""TTS command to convert text to speech."""
command: str = Field(default="tts", description="Command type")
text: str = Field(..., description="Text to synthesize")
speaker: Optional[str] = Field(default=None, description="Speaker voice name")
play_id: Optional[str] = Field(default=None, description="Unique identifier for this TTS session")
auto_hangup: bool = Field(default=False, description="Auto hangup after TTS completion")
streaming: bool = Field(default=False, description="Streaming text input")
end_of_stream: bool = Field(default=False, description="End of streaming input")
wait_input_timeout: Optional[int] = Field(default=None, description="Max time to wait for input (seconds)")
option: Optional[Dict[str, Any]] = Field(default=None, description="TTS provider specific options")
class PlayCommand(BaseModel):
"""Play command to play audio from URL."""
command: str = Field(default="play", description="Command type")
url: str = Field(..., description="URL of audio file to play")
auto_hangup: bool = Field(default=False, description="Auto hangup after playback")
wait_input_timeout: Optional[int] = Field(default=None, description="Max time to wait for input (seconds)")
class InterruptCommand(BaseModel):
"""Interrupt command to interrupt current playback."""
command: str = Field(default="interrupt", description="Command type")
graceful: bool = Field(default=False, description="Wait for current TTS to complete")
class PauseCommand(BaseModel):
"""Pause command to pause current playback."""
command: str = Field(default="pause", description="Command type")
class ResumeCommand(BaseModel):
"""Resume command to resume paused playback."""
command: str = Field(default="resume", description="Command type")
class HangupCommand(BaseModel):
"""Hangup command to end the call."""
command: str = Field(default="hangup", description="Command type")
reason: Optional[str] = Field(default=None, description="Reason for hangup")
initiator: Optional[str] = Field(default=None, description="Who initiated the hangup")
class HistoryCommand(BaseModel):
"""History command to add conversation history."""
command: str = Field(default="history", description="Command type")
speaker: str = Field(..., description="Speaker identifier")
text: str = Field(..., description="Conversation text")
class ChatCommand(BaseModel):
"""Chat command for text-based conversation."""
command: str = Field(default="chat", description="Command type")
text: str = Field(..., description="Chat text message")
# Command type mapping
COMMAND_TYPES = {
"invite": InviteCommand,
"accept": AcceptCommand,
"reject": RejectCommand,
"ringing": RingingCommand,
"tts": TTSCommand,
"play": PlayCommand,
"interrupt": InterruptCommand,
"pause": PauseCommand,
"resume": ResumeCommand,
"hangup": HangupCommand,
"history": HistoryCommand,
"chat": ChatCommand,
}
def parse_command(data: Dict[str, Any]) -> BaseModel:
"""
Parse a command from JSON data.
Args:
data: JSON data as dictionary
Returns:
Parsed command model
Raises:
ValueError: If command type is unknown
"""
command_type = data.get("command")
if not command_type:
raise ValueError("Missing 'command' field")
command_class = COMMAND_TYPES.get(command_type)
if not command_class:
raise ValueError(f"Unknown command type: {command_type}")
return command_class(**data)

View File

@@ -1,126 +0,0 @@
"""Configuration models for call options."""
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
class VADOption(BaseModel):
"""Voice Activity Detection configuration."""
type: str = Field(default="silero", description="VAD algorithm type (silero, webrtc)")
samplerate: int = Field(default=16000, description="Audio sample rate for VAD")
speech_padding: int = Field(default=250, description="Speech padding in milliseconds")
silence_padding: int = Field(default=100, description="Silence padding in milliseconds")
ratio: float = Field(default=0.5, description="Voice detection ratio threshold")
voice_threshold: float = Field(default=0.5, description="Voice energy threshold")
max_buffer_duration_secs: int = Field(default=50, description="Maximum buffer duration in seconds")
silence_timeout: Optional[int] = Field(default=None, description="Silence timeout in milliseconds")
endpoint: Optional[str] = Field(default=None, description="Custom VAD service endpoint")
secret_key: Optional[str] = Field(default=None, description="VAD service secret key")
secret_id: Optional[str] = Field(default=None, description="VAD service secret ID")
class ASROption(BaseModel):
"""Automatic Speech Recognition configuration."""
provider: str = Field(..., description="ASR provider (tencent, aliyun, openai, etc.)")
language: Optional[str] = Field(default=None, description="Language code (zh-CN, en-US)")
app_id: Optional[str] = Field(default=None, description="Application ID")
secret_id: Optional[str] = Field(default=None, description="Secret ID for authentication")
secret_key: Optional[str] = Field(default=None, description="Secret key for authentication")
model_type: Optional[str] = Field(default=None, description="ASR model type (16k_zh, 8k_en)")
buffer_size: Optional[int] = Field(default=None, description="Audio buffer size in bytes")
samplerate: Optional[int] = Field(default=None, description="Audio sample rate")
endpoint: Optional[str] = Field(default=None, description="Custom ASR service endpoint")
extra: Optional[Dict[str, Any]] = Field(default=None, description="Additional parameters")
start_when_answer: bool = Field(default=False, description="Start ASR when call is answered")
class TTSOption(BaseModel):
"""Text-to-Speech configuration."""
samplerate: Optional[int] = Field(default=None, description="TTS output sample rate")
provider: str = Field(default="msedge", description="TTS provider (tencent, aliyun, deepgram, msedge)")
speed: float = Field(default=1.0, description="Speech speed multiplier")
app_id: Optional[str] = Field(default=None, description="Application ID")
secret_id: Optional[str] = Field(default=None, description="Secret ID for authentication")
secret_key: Optional[str] = Field(default=None, description="Secret key for authentication")
volume: Optional[int] = Field(default=None, description="Speech volume level (1-10)")
speaker: Optional[str] = Field(default=None, description="Voice speaker name")
codec: Optional[str] = Field(default=None, description="Audio codec")
subtitle: bool = Field(default=False, description="Enable subtitle generation")
emotion: Optional[str] = Field(default=None, description="Speech emotion")
endpoint: Optional[str] = Field(default=None, description="Custom TTS service endpoint")
extra: Optional[Dict[str, Any]] = Field(default=None, description="Additional parameters")
max_concurrent_tasks: Optional[int] = Field(default=None, description="Max concurrent tasks")
class RecorderOption(BaseModel):
"""Call recording configuration."""
recorder_file: str = Field(..., description="Path to recording file")
samplerate: int = Field(default=16000, description="Recording sample rate")
ptime: int = Field(default=200, description="Packet time in milliseconds")
class MediaPassOption(BaseModel):
"""Media pass-through configuration for external audio processing."""
url: str = Field(..., description="WebSocket URL for media streaming")
input_sample_rate: int = Field(default=16000, description="Sample rate of audio received from WebSocket")
output_sample_rate: int = Field(default=16000, description="Sample rate of audio sent to WebSocket")
packet_size: int = Field(default=2560, description="Packet size in bytes")
ptime: Optional[int] = Field(default=None, description="Buffered playback period in milliseconds")
class SipOption(BaseModel):
"""SIP protocol configuration."""
username: Optional[str] = Field(default=None, description="SIP username")
password: Optional[str] = Field(default=None, description="SIP password")
realm: Optional[str] = Field(default=None, description="SIP realm/domain")
headers: Optional[Dict[str, str]] = Field(default=None, description="Additional SIP headers")
class HandlerRule(BaseModel):
"""Handler routing rule."""
caller: Optional[str] = Field(default=None, description="Caller pattern (regex)")
callee: Optional[str] = Field(default=None, description="Callee pattern (regex)")
playbook: Optional[str] = Field(default=None, description="Playbook file path")
webhook: Optional[str] = Field(default=None, description="Webhook URL")
class CallOption(BaseModel):
"""Comprehensive call configuration options."""
# Basic options
denoise: bool = Field(default=False, description="Enable noise reduction")
offer: Optional[str] = Field(default=None, description="SDP offer string")
callee: Optional[str] = Field(default=None, description="Callee SIP URI or phone number")
caller: Optional[str] = Field(default=None, description="Caller SIP URI or phone number")
# Audio codec
codec: str = Field(default="pcm", description="Audio codec (pcm, pcma, pcmu, g722)")
# Component configurations
recorder: Optional[RecorderOption] = Field(default=None, description="Call recording config")
asr: Optional[ASROption] = Field(default=None, description="ASR configuration")
vad: Optional[VADOption] = Field(default=None, description="VAD configuration")
tts: Optional[TTSOption] = Field(default=None, description="TTS configuration")
media_pass: Optional[MediaPassOption] = Field(default=None, description="Media pass-through config")
sip: Optional[SipOption] = Field(default=None, description="SIP configuration")
# Timeouts and networking
handshake_timeout: Optional[int] = Field(default=None, description="Handshake timeout in seconds")
enable_ipv6: bool = Field(default=False, description="Enable IPv6 support")
inactivity_timeout: Optional[int] = Field(default=None, description="Inactivity timeout in seconds")
# EOU configuration
eou: Optional[Dict[str, Any]] = Field(default=None, description="End of utterance detection config")
# Extra parameters
extra: Optional[Dict[str, Any]] = Field(default=None, description="Additional custom parameters")
class Config:
populate_by_name = True

View File

@@ -1,231 +0,0 @@
"""Protocol event models matching the original active-call API."""
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime
def current_timestamp_ms() -> int:
"""Get current timestamp in milliseconds."""
return int(datetime.now().timestamp() * 1000)
# Base Event Model
class BaseEvent(BaseModel):
"""Base event model."""
event: str = Field(..., description="Event type")
track_id: str = Field(..., description="Unique track identifier")
timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp in milliseconds")
# Lifecycle Events
class IncomingEvent(BaseEvent):
"""Incoming call event (SIP only)."""
event: str = Field(default="incoming", description="Event type")
caller: Optional[str] = Field(default=None, description="Caller's SIP URI")
callee: Optional[str] = Field(default=None, description="Callee's SIP URI")
sdp: Optional[str] = Field(default=None, description="SDP offer from caller")
class AnswerEvent(BaseEvent):
"""Call answered event."""
event: str = Field(default="answer", description="Event type")
sdp: Optional[str] = Field(default=None, description="SDP answer from server")
class RejectEvent(BaseEvent):
"""Call rejected event."""
event: str = Field(default="reject", description="Event type")
reason: Optional[str] = Field(default=None, description="Rejection reason")
code: Optional[int] = Field(default=None, description="SIP response code")
class RingingEvent(BaseEvent):
"""Call ringing event."""
event: str = Field(default="ringing", description="Event type")
early_media: bool = Field(default=False, description="Early media available")
class HangupEvent(BaseModel):
"""Call hangup event."""
event: str = Field(default="hangup", description="Event type")
timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp")
reason: Optional[str] = Field(default=None, description="Hangup reason")
initiator: Optional[str] = Field(default=None, description="Who initiated hangup")
start_time: Optional[str] = Field(default=None, description="Call start time (ISO 8601)")
hangup_time: Optional[str] = Field(default=None, description="Hangup time (ISO 8601)")
answer_time: Optional[str] = Field(default=None, description="Answer time (ISO 8601)")
ringing_time: Optional[str] = Field(default=None, description="Ringing time (ISO 8601)")
from_: Optional[Dict[str, Any]] = Field(default=None, alias="from", description="Caller info")
to: Optional[Dict[str, Any]] = Field(default=None, description="Callee info")
extra: Optional[Dict[str, Any]] = Field(default=None, description="Additional metadata")
class Config:
populate_by_name = True
# VAD Events
class SpeakingEvent(BaseEvent):
"""Speech detected event."""
event: str = Field(default="speaking", description="Event type")
start_time: int = Field(default_factory=current_timestamp_ms, description="Speech start time")
class SilenceEvent(BaseEvent):
"""Silence detected event."""
event: str = Field(default="silence", description="Event type")
start_time: int = Field(default_factory=current_timestamp_ms, description="Silence start time")
duration: int = Field(default=0, description="Silence duration in milliseconds")
# AI/ASR Events
class AsrFinalEvent(BaseEvent):
"""ASR final transcription event."""
event: str = Field(default="asrFinal", description="Event type")
index: int = Field(..., description="ASR result sequence number")
start_time: Optional[int] = Field(default=None, description="Speech start time")
end_time: Optional[int] = Field(default=None, description="Speech end time")
text: str = Field(..., description="Transcribed text")
class AsrDeltaEvent(BaseEvent):
"""ASR partial transcription event (streaming)."""
event: str = Field(default="asrDelta", description="Event type")
index: int = Field(..., description="ASR result sequence number")
start_time: Optional[int] = Field(default=None, description="Speech start time")
end_time: Optional[int] = Field(default=None, description="Speech end time")
text: str = Field(..., description="Partial transcribed text")
class EouEvent(BaseEvent):
"""End of utterance detection event."""
event: str = Field(default="eou", description="Event type")
completed: bool = Field(default=True, description="Whether utterance was completed")
# Audio Track Events
class TrackStartEvent(BaseEvent):
"""Audio track start event."""
event: str = Field(default="trackStart", description="Event type")
play_id: Optional[str] = Field(default=None, description="Play ID from TTS/Play command")
class TrackEndEvent(BaseEvent):
"""Audio track end event."""
event: str = Field(default="trackEnd", description="Event type")
duration: int = Field(..., description="Track duration in milliseconds")
ssrc: int = Field(..., description="RTP SSRC identifier")
play_id: Optional[str] = Field(default=None, description="Play ID from TTS/Play command")
class InterruptionEvent(BaseEvent):
"""Playback interruption event."""
event: str = Field(default="interruption", description="Event type")
play_id: Optional[str] = Field(default=None, description="Play ID that was interrupted")
subtitle: Optional[str] = Field(default=None, description="TTS text being played")
position: Optional[int] = Field(default=None, description="Word index position")
total_duration: Optional[int] = Field(default=None, description="Total TTS duration")
current: Optional[int] = Field(default=None, description="Elapsed time when interrupted")
# System Events
class ErrorEvent(BaseEvent):
"""Error event."""
event: str = Field(default="error", description="Event type")
sender: str = Field(..., description="Component that generated the error")
error: str = Field(..., description="Error message")
code: Optional[int] = Field(default=None, description="Error code")
class MetricsEvent(BaseModel):
"""Performance metrics event."""
event: str = Field(default="metrics", description="Event type")
timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp")
key: str = Field(..., description="Metric key")
duration: int = Field(..., description="Duration in milliseconds")
data: Optional[Dict[str, Any]] = Field(default=None, description="Additional metric data")
class AddHistoryEvent(BaseModel):
"""Conversation history entry added event."""
event: str = Field(default="addHistory", description="Event type")
timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp")
sender: Optional[str] = Field(default=None, description="Component that added history")
speaker: str = Field(..., description="Speaker identifier")
text: str = Field(..., description="Conversation text")
class DTMFEvent(BaseEvent):
"""DTMF tone detected event."""
event: str = Field(default="dtmf", description="Event type")
digit: str = Field(..., description="DTMF digit (0-9, *, #, A-D)")
class HeartBeatEvent(BaseModel):
"""Server-to-client heartbeat to keep connection alive."""
event: str = Field(default="heartBeat", description="Event type")
timestamp: int = Field(default_factory=current_timestamp_ms, description="Event timestamp in milliseconds")
# Event type mapping
EVENT_TYPES = {
"incoming": IncomingEvent,
"answer": AnswerEvent,
"reject": RejectEvent,
"ringing": RingingEvent,
"hangup": HangupEvent,
"speaking": SpeakingEvent,
"silence": SilenceEvent,
"asrFinal": AsrFinalEvent,
"asrDelta": AsrDeltaEvent,
"eou": EouEvent,
"trackStart": TrackStartEvent,
"trackEnd": TrackEndEvent,
"interruption": InterruptionEvent,
"error": ErrorEvent,
"metrics": MetricsEvent,
"addHistory": AddHistoryEvent,
"dtmf": DTMFEvent,
"heartBeat": HeartBeatEvent,
}
def create_event(event_type: str, **kwargs) -> BaseModel:
"""
Create an event model.
Args:
event_type: Type of event to create
**kwargs: Event fields
Returns:
Event model instance
Raises:
ValueError: If event type is unknown
"""
event_class = EVENT_TYPES.get(event_type)
if not event_class:
raise ValueError(f"Unknown event type: {event_type}")
return event_class(event=event_type, **kwargs)

View File

@@ -0,0 +1 @@
"""Protocol package."""

View File

@@ -0,0 +1 @@
"""WS v1 protocol package."""

View File

@@ -0,0 +1 @@
"""Providers package."""

View File

@@ -0,0 +1 @@
"""ASR providers."""

View File

@@ -9,7 +9,7 @@ import json
from typing import AsyncIterator, Optional
from loguru import logger
from services.base import BaseASRService, ASRResult, ServiceState
from providers.common.base import BaseASRService, ASRResult, ServiceState
# Try to import websockets for streaming ASR
try:

View File

@@ -19,7 +19,7 @@ except ImportError:
AIOHTTP_AVAILABLE = False
logger.warning("aiohttp not available - OpenAICompatibleASRService will not work")
from services.base import BaseASRService, ASRResult, ServiceState
from providers.common.base import BaseASRService, ASRResult, ServiceState
class OpenAICompatibleASRService(BaseASRService):

View File

@@ -1,6 +1,6 @@
"""Backward-compatible imports for legacy siliconflow_asr module."""
from services.openai_compatible_asr import OpenAICompatibleASRService
from providers.asr.openai_compatible import OpenAICompatibleASRService
# Backward-compatible alias
SiliconFlowASRService = OpenAICompatibleASRService

View File

@@ -0,0 +1 @@
"""Common provider types."""

View File

@@ -0,0 +1 @@
"""Provider factories."""

View File

@@ -0,0 +1,112 @@
"""Default runtime service factory implementing core extension ports."""
from __future__ import annotations
from typing import Any
from loguru import logger
from runtime.ports import (
ASRPort,
ASRServiceSpec,
LLMPort,
LLMServiceSpec,
RealtimeServiceFactory,
TTSPort,
TTSServiceSpec,
)
from providers.asr.buffered import BufferedASRService
from providers.tts.dashscope import DashScopeTTSService
from providers.llm.openai import MockLLMService, OpenAILLMService
from providers.asr.openai_compatible import OpenAICompatibleASRService
from providers.tts.openai_compatible import OpenAICompatibleTTSService
from providers.tts.mock import MockTTSService
_OPENAI_COMPATIBLE_PROVIDERS = {"openai_compatible", "openai-compatible", "siliconflow"}
_SUPPORTED_LLM_PROVIDERS = {"openai", *_OPENAI_COMPATIBLE_PROVIDERS}
class DefaultRealtimeServiceFactory(RealtimeServiceFactory):
"""Build concrete runtime services from normalized specs."""
_DEFAULT_DASHSCOPE_TTS_REALTIME_URL = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime"
_DEFAULT_DASHSCOPE_TTS_MODEL = "qwen3-tts-flash-realtime"
_DEFAULT_OPENAI_COMPATIBLE_TTS_MODEL = "FunAudioLLM/CosyVoice2-0.5B"
_DEFAULT_OPENAI_COMPATIBLE_ASR_MODEL = "FunAudioLLM/SenseVoiceSmall"
@staticmethod
def _normalize_provider(provider: Any) -> str:
return str(provider or "").strip().lower()
@staticmethod
def _resolve_dashscope_mode(raw_mode: Any) -> str:
mode = str(raw_mode or "commit").strip().lower()
if mode in {"commit", "server_commit"}:
return mode
return "commit"
def create_llm_service(self, spec: LLMServiceSpec) -> LLMPort:
provider = self._normalize_provider(spec.provider)
if provider in _SUPPORTED_LLM_PROVIDERS and spec.api_key:
return OpenAILLMService(
api_key=spec.api_key,
base_url=spec.base_url,
model=spec.model,
system_prompt=spec.system_prompt,
knowledge_config=spec.knowledge_config,
knowledge_searcher=spec.knowledge_searcher,
)
logger.warning(
"LLM provider unsupported or API key missing (provider={}); using mock LLM",
provider or "-",
)
return MockLLMService()
def create_tts_service(self, spec: TTSServiceSpec) -> TTSPort:
provider = self._normalize_provider(spec.provider)
if provider == "dashscope" and spec.api_key:
return DashScopeTTSService(
api_key=spec.api_key,
api_url=spec.api_url or self._DEFAULT_DASHSCOPE_TTS_REALTIME_URL,
voice=spec.voice,
model=spec.model or self._DEFAULT_DASHSCOPE_TTS_MODEL,
mode=self._resolve_dashscope_mode(spec.mode),
sample_rate=spec.sample_rate,
speed=spec.speed,
)
if provider in _OPENAI_COMPATIBLE_PROVIDERS and spec.api_key:
return OpenAICompatibleTTSService(
api_key=spec.api_key,
api_url=spec.api_url,
voice=spec.voice,
model=spec.model or self._DEFAULT_OPENAI_COMPATIBLE_TTS_MODEL,
sample_rate=spec.sample_rate,
speed=spec.speed,
)
logger.warning(
"TTS provider unsupported or API key missing (provider={}); using mock TTS",
provider or "-",
)
return MockTTSService(sample_rate=spec.sample_rate)
def create_asr_service(self, spec: ASRServiceSpec) -> ASRPort:
provider = self._normalize_provider(spec.provider)
if provider in _OPENAI_COMPATIBLE_PROVIDERS and spec.api_key:
return OpenAICompatibleASRService(
api_key=spec.api_key,
api_url=spec.api_url,
model=spec.model or self._DEFAULT_OPENAI_COMPATIBLE_ASR_MODEL,
sample_rate=spec.sample_rate,
language=spec.language,
interim_interval_ms=spec.interim_interval_ms,
min_audio_for_interim_ms=spec.min_audio_for_interim_ms,
on_transcript=spec.on_transcript,
)
logger.info("Using buffered ASR service (provider={})", provider or "-")
return BufferedASRService(sample_rate=spec.sample_rate, language=spec.language)

View File

@@ -0,0 +1 @@
"""LLM providers."""

View File

@@ -10,8 +10,8 @@ import uuid
from typing import AsyncIterator, Optional, List, Dict, Any, Callable, Awaitable
from loguru import logger
from app.backend_adapters import build_backend_adapter_from_settings
from services.base import BaseLLMService, LLMMessage, LLMStreamEvent, ServiceState
from adapters.control_plane.backend import build_backend_adapter_from_settings
from providers.common.base import BaseLLMService, LLMMessage, LLMStreamEvent, ServiceState
# Try to import openai
try:

View File

@@ -0,0 +1 @@
"""Realtime providers."""

View File

@@ -0,0 +1 @@
"""TTS providers."""

View File

@@ -12,7 +12,7 @@ from typing import Any, AsyncIterator, Dict, Optional, Tuple
from loguru import logger
from services.base import BaseTTSService, ServiceState, TTSChunk
from providers.common.base import BaseTTSService, ServiceState, TTSChunk
try:
import dashscope

View File

@@ -0,0 +1,49 @@
"""TTS service implementations used by the engine runtime."""
import asyncio
from typing import AsyncIterator
from loguru import logger
from providers.common.base import BaseTTSService, TTSChunk, ServiceState
class MockTTSService(BaseTTSService):
"""Mock TTS service for tests and no-provider fallback."""
def __init__(
self,
voice: str = "mock",
sample_rate: int = 16000,
speed: float = 1.0,
):
super().__init__(voice=voice, sample_rate=sample_rate, speed=speed)
async def connect(self) -> None:
self.state = ServiceState.CONNECTED
logger.info("Mock TTS service connected")
async def disconnect(self) -> None:
self.state = ServiceState.DISCONNECTED
logger.info("Mock TTS service disconnected")
async def synthesize(self, text: str) -> bytes:
"""Generate silence based on text length."""
word_count = len(text.split())
duration_ms = word_count * 100
samples = int(self.sample_rate * duration_ms / 1000)
return bytes(samples * 2)
async def synthesize_stream(self, text: str) -> AsyncIterator[TTSChunk]:
"""Generate silence chunks to emulate streaming synthesis."""
audio = await self.synthesize(text)
chunk_size = self.sample_rate * 2 // 10
for i in range(0, len(audio), chunk_size):
chunk_data = audio[i : i + chunk_size]
yield TTSChunk(
audio=chunk_data,
sample_rate=self.sample_rate,
is_final=(i + chunk_size >= len(audio)),
)
await asyncio.sleep(0.05)

View File

@@ -13,8 +13,8 @@ from typing import AsyncIterator, Optional
from urllib.parse import urlparse, urlunparse
from loguru import logger
from services.base import BaseTTSService, TTSChunk, ServiceState
from services.streaming_tts_adapter import StreamingTTSAdapter # backward-compatible re-export
from providers.common.base import BaseTTSService, TTSChunk, ServiceState
from providers.tts.streaming_adapter import StreamingTTSAdapter # backward-compatible re-export
class OpenAICompatibleTTSService(BaseTTSService):

View File

@@ -1,6 +1,6 @@
"""Backward-compatible imports for legacy siliconflow_tts module."""
from services.openai_compatible_tts import OpenAICompatibleTTSService, StreamingTTSAdapter
from providers.tts.openai_compatible import OpenAICompatibleTTSService, StreamingTTSAdapter
# Backward-compatible alias
SiliconFlowTTSService = OpenAICompatibleTTSService

View File

@@ -4,8 +4,8 @@ import asyncio
from loguru import logger
from services.base import BaseTTSService
from services.streaming_text import extract_tts_sentence, has_spoken_content
from providers.common.base import BaseTTSService
from providers.common.streaming_text import extract_tts_sentence, has_spoken_content
class StreamingTTSAdapter:

View File

@@ -31,7 +31,17 @@ Issues = "https://github.com/yourusername/py-active-call-cc/issues"
[tool.setuptools.packages.find]
where = ["."]
include = ["app*"]
include = [
"app*",
"adapters*",
"protocol*",
"providers*",
"processors*",
"runtime*",
"tools*",
"utils*",
"workflow*",
]
exclude = ["tests*", "scripts*", "reference*"]
[tool.black]

View File

@@ -29,10 +29,6 @@ aiohttp>=3.9.1
openai>=1.0.0
dashscope>=1.25.11
# AI Services - TTS
edge-tts>=6.1.0
pydub>=0.25.0 # For audio format conversion
# Microphone client dependencies
sounddevice>=0.4.6
soundfile>=0.12.1

View File

@@ -0,0 +1 @@
"""Runtime package."""

View File

@@ -10,7 +10,7 @@ from dataclasses import dataclass, field
from enum import Enum
from loguru import logger
from services.base import LLMMessage
from providers.common.base import LLMMessage
class ConversationState(Enum):

View File

@@ -0,0 +1 @@
"""Runtime history package."""

View File

@@ -5,10 +5,12 @@ from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from typing import Any, Optional
from typing import Optional
from loguru import logger
from runtime.ports import ConversationHistoryStore
@dataclass
class _HistoryTranscriptJob:
@@ -29,7 +31,7 @@ class SessionHistoryBridge:
def __init__(
self,
*,
history_writer: Any,
history_writer: ConversationHistoryStore | None,
enabled: bool,
queue_max_size: int,
retry_max_attempts: int,

View File

@@ -0,0 +1 @@
"""Runtime pipeline package."""

View File

@@ -0,0 +1,13 @@
"""ASR flow helpers extracted from the duplex pipeline.
This module is intentionally lightweight for phase-wise migration.
"""
from __future__ import annotations
from providers.common.base import ASRResult
def is_final_result(result: ASRResult) -> bool:
"""Return whether an ASR result is final."""
return bool(result.is_final)

View File

@@ -0,0 +1,6 @@
"""Shared constants for the runtime duplex pipeline."""
TRACK_AUDIO_IN = "audio_in"
TRACK_AUDIO_OUT = "audio_out"
TRACK_CONTROL = "control"
PCM_FRAME_BYTES = 640 # 16k mono pcm_s16le, 20ms

View File

@@ -26,21 +26,25 @@ import aiohttp
from loguru import logger
from app.config import settings
from core.conversation import ConversationManager, ConversationState
from core.events import get_event_bus
from core.tool_executor import execute_server_tool
from core.transports import BaseTransport
from models.ws_v1 import ev
from providers.factory.default import DefaultRealtimeServiceFactory
from runtime.conversation import ConversationManager, ConversationState
from runtime.events import get_event_bus
from runtime.ports import (
ASRPort,
ASRServiceSpec,
LLMPort,
LLMServiceSpec,
RealtimeServiceFactory,
TTSPort,
TTSServiceSpec,
)
from tools.executor import execute_server_tool
from runtime.transports import BaseTransport
from protocol.ws_v1.schema import ev
from processors.eou import EouDetector
from processors.vad import SileroVAD, VADProcessor
from services.asr import BufferedASRService
from services.base import BaseASRService, BaseLLMService, BaseTTSService, LLMMessage, LLMStreamEvent
from services.dashscope_tts import DashScopeTTSService
from services.llm import MockLLMService, OpenAILLMService
from services.openai_compatible_asr import OpenAICompatibleASRService
from services.openai_compatible_tts import OpenAICompatibleTTSService
from services.streaming_text import extract_tts_sentence, has_spoken_content
from services.tts import EdgeTTSService, MockTTSService
from providers.common.base import LLMMessage, LLMStreamEvent
from providers.common.streaming_text import extract_tts_sentence, has_spoken_content
class DuplexPipeline:
@@ -258,9 +262,9 @@ class DuplexPipeline:
self,
transport: BaseTransport,
session_id: str,
llm_service: Optional[BaseLLMService] = None,
tts_service: Optional[BaseTTSService] = None,
asr_service: Optional[BaseASRService] = None,
llm_service: Optional[LLMPort] = None,
tts_service: Optional[TTSPort] = None,
asr_service: Optional[ASRPort] = None,
system_prompt: Optional[str] = None,
greeting: Optional[str] = None,
knowledge_searcher: Optional[
@@ -272,6 +276,7 @@ class DuplexPipeline:
server_tool_executor: Optional[
Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]
] = None,
service_factory: Optional[RealtimeServiceFactory] = None,
):
"""
Initialize duplex pipeline.
@@ -279,8 +284,8 @@ class DuplexPipeline:
Args:
transport: Transport for sending audio/events
session_id: Session identifier
llm_service: LLM service (defaults to OpenAI)
tts_service: TTS service (defaults to EdgeTTS)
llm_service: Optional injected LLM port implementation
tts_service: Optional injected TTS port implementation
asr_service: ASR service (optional)
system_prompt: System prompt for LLM
greeting: Optional greeting to speak on start
@@ -312,6 +317,7 @@ class DuplexPipeline:
self.llm_service = llm_service
self.tts_service = tts_service
self.asr_service = asr_service # Will be initialized in start()
self._service_factory = service_factory or DefaultRealtimeServiceFactory()
self._knowledge_searcher = knowledge_searcher
self._tool_resource_resolver = tool_resource_resolver
self._server_tool_executor = server_tool_executor
@@ -776,21 +782,11 @@ class DuplexPipeline:
return False
return None
@staticmethod
def _is_openai_compatible_provider(provider: Any) -> bool:
normalized = str(provider or "").strip().lower()
return normalized in {"openai_compatible", "openai-compatible", "siliconflow"}
@staticmethod
def _is_dashscope_tts_provider(provider: Any) -> bool:
normalized = str(provider or "").strip().lower()
return normalized == "dashscope"
@staticmethod
def _is_llm_provider_supported(provider: Any) -> bool:
normalized = str(provider or "").strip().lower()
return normalized in {"openai", "openai_compatible", "openai-compatible", "siliconflow"}
@staticmethod
def _default_llm_base_url(provider: Any) -> Optional[str]:
normalized = str(provider or "").strip().lower()
@@ -798,10 +794,6 @@ class DuplexPipeline:
return "https://api.siliconflow.cn/v1"
return None
@staticmethod
def _default_dashscope_tts_realtime_url() -> str:
return "wss://dashscope.aliyuncs.com/api-ws/v1/realtime"
@staticmethod
def _default_dashscope_tts_model() -> str:
return "qwen3-tts-flash-realtime"
@@ -900,18 +892,18 @@ class DuplexPipeline:
or self._default_llm_base_url(llm_provider)
)
llm_model = self._runtime_llm.get("model") or settings.llm_model
if self._is_llm_provider_supported(llm_provider) and llm_api_key:
self.llm_service = OpenAILLMService(
api_key=llm_api_key,
base_url=llm_base_url,
model=llm_model,
self.llm_service = self._service_factory.create_llm_service(
LLMServiceSpec(
provider=llm_provider,
model=str(llm_model),
api_key=str(llm_api_key).strip() if llm_api_key else None,
base_url=str(llm_base_url).strip() if llm_base_url else None,
system_prompt=self.conversation.system_prompt,
temperature=settings.llm_temperature,
knowledge_config=self._resolved_knowledge_config(),
knowledge_searcher=self._knowledge_searcher,
)
else:
logger.warning("LLM provider unsupported or API key missing - using mock LLM")
self.llm_service = MockLLMService()
)
if hasattr(self.llm_service, "set_knowledge_config"):
self.llm_service.set_knowledge_config(self._resolved_knowledge_config())
@@ -938,41 +930,29 @@ class DuplexPipeline:
"services.tts.mode is DashScope-only and will be ignored "
f"for provider={tts_provider}"
)
if self._is_dashscope_tts_provider(tts_provider) and tts_api_key:
self.tts_service = DashScopeTTSService(
api_key=tts_api_key,
api_url=tts_api_url or self._default_dashscope_tts_realtime_url(),
voice=tts_voice,
model=tts_model or self._default_dashscope_tts_model(),
self.tts_service = self._service_factory.create_tts_service(
TTSServiceSpec(
provider=tts_provider,
api_key=str(tts_api_key).strip() if tts_api_key else None,
api_url=str(tts_api_url).strip() if tts_api_url else None,
voice=str(tts_voice),
model=str(tts_model).strip() if tts_model else None,
sample_rate=settings.sample_rate,
speed=tts_speed,
mode=str(tts_mode),
sample_rate=settings.sample_rate,
speed=tts_speed
)
logger.info("Using DashScope realtime TTS service")
elif self._is_openai_compatible_provider(tts_provider) and tts_api_key:
self.tts_service = OpenAICompatibleTTSService(
api_key=tts_api_key,
api_url=tts_api_url,
voice=tts_voice,
model=tts_model or "FunAudioLLM/CosyVoice2-0.5B",
sample_rate=settings.sample_rate,
speed=tts_speed
)
logger.info(f"Using OpenAI-compatible TTS service (provider={tts_provider})")
else:
self.tts_service = EdgeTTSService(
voice=tts_voice,
sample_rate=settings.sample_rate
)
logger.info("Using Edge TTS service")
)
try:
await self.tts_service.connect()
except Exception as e:
logger.warning(f"TTS backend unavailable ({e}); falling back to MockTTS")
self.tts_service = MockTTSService(
sample_rate=settings.sample_rate
logger.warning(f"TTS backend unavailable ({e}); falling back to default TTS adapter")
self.tts_service = self._service_factory.create_tts_service(
TTSServiceSpec(
provider="mock",
voice="mock",
sample_rate=settings.sample_rate,
)
)
await self.tts_service.connect()
else:
@@ -988,22 +968,19 @@ class DuplexPipeline:
asr_interim_interval = int(self._runtime_asr.get("interimIntervalMs") or settings.asr_interim_interval_ms)
asr_min_audio_ms = int(self._runtime_asr.get("minAudioMs") or settings.asr_min_audio_ms)
if self._is_openai_compatible_provider(asr_provider) and asr_api_key:
self.asr_service = OpenAICompatibleASRService(
api_key=asr_api_key,
api_url=asr_api_url,
model=asr_model or "FunAudioLLM/SenseVoiceSmall",
self.asr_service = self._service_factory.create_asr_service(
ASRServiceSpec(
provider=asr_provider,
sample_rate=settings.sample_rate,
language="auto",
api_key=str(asr_api_key).strip() if asr_api_key else None,
api_url=str(asr_api_url).strip() if asr_api_url else None,
model=str(asr_model).strip() if asr_model else None,
interim_interval_ms=asr_interim_interval,
min_audio_for_interim_ms=asr_min_audio_ms,
on_transcript=self._on_transcript_callback
on_transcript=self._on_transcript_callback,
)
logger.info(f"Using OpenAI-compatible ASR service (provider={asr_provider})")
else:
self.asr_service = BufferedASRService(
sample_rate=settings.sample_rate
)
logger.info("Using Buffered ASR service (no real transcription)")
)
await self.asr_service.connect()

View File

@@ -0,0 +1,12 @@
"""Output-event shaping helpers for the runtime pipeline."""
from __future__ import annotations
from typing import Any, Dict
def assistant_text_delta_event(text: str, **extra: Any) -> Dict[str, Any]:
"""Build a normalized assistant text delta payload."""
payload: Dict[str, Any] = {"type": "assistant.text.delta", "text": str(text)}
payload.update(extra)
return payload

View File

@@ -0,0 +1,8 @@
"""Interruption-related helpers extracted from the duplex pipeline."""
from __future__ import annotations
def should_interrupt(min_duration_ms: int, detected_ms: int) -> bool:
"""Decide whether interruption conditions are met."""
return int(detected_ms) >= max(0, int(min_duration_ms))

View File

@@ -0,0 +1,13 @@
"""LLM flow helpers extracted from the duplex pipeline.
This module is intentionally lightweight for phase-wise migration.
"""
from __future__ import annotations
from providers.common.base import LLMStreamEvent
def is_done_event(event: LLMStreamEvent) -> bool:
"""Return whether an LLM stream event signals completion."""
return str(event.type) == "done"

View File

@@ -0,0 +1,13 @@
"""Tooling helpers extracted from the duplex pipeline."""
from __future__ import annotations
from typing import Any
def normalize_tool_name(name: Any, aliases: dict[str, str]) -> str:
"""Normalize tool name with alias mapping."""
normalized = str(name or "").strip()
if not normalized:
return ""
return aliases.get(normalized, normalized)

View File

@@ -0,0 +1,15 @@
"""TTS flow helpers extracted from the duplex pipeline.
This module is intentionally lightweight for phase-wise migration.
"""
from __future__ import annotations
from providers.common.base import TTSChunk
def chunk_duration_ms(chunk: TTSChunk) -> float:
"""Estimate chunk duration in milliseconds for pcm16 mono."""
if chunk.sample_rate <= 0:
return 0.0
return (len(chunk.audio) / 2.0 / float(chunk.sample_rate)) * 1000.0

View File

@@ -0,0 +1,32 @@
"""Port interfaces for runtime integration boundaries."""
from runtime.ports.asr import ASRBufferControl, ASRInterimControl, ASRPort, ASRServiceSpec
from runtime.ports.control_plane import (
AssistantRuntimeConfigProvider,
ControlPlaneGateway,
ConversationHistoryStore,
KnowledgeRetriever,
ToolCatalog,
)
from runtime.ports.llm import LLMCancellable, LLMPort, LLMRuntimeConfigurable, LLMServiceSpec
from runtime.ports.service_factory import RealtimeServiceFactory
from runtime.ports.tts import TTSPort, TTSServiceSpec
__all__ = [
"ASRPort",
"ASRServiceSpec",
"ASRInterimControl",
"ASRBufferControl",
"AssistantRuntimeConfigProvider",
"ControlPlaneGateway",
"ConversationHistoryStore",
"KnowledgeRetriever",
"ToolCatalog",
"LLMCancellable",
"LLMPort",
"LLMRuntimeConfigurable",
"LLMServiceSpec",
"RealtimeServiceFactory",
"TTSPort",
"TTSServiceSpec",
]

View File

@@ -0,0 +1,64 @@
"""ASR extension port contracts."""
from __future__ import annotations
from dataclasses import dataclass
from typing import AsyncIterator, Awaitable, Callable, Optional, Protocol
from providers.common.base import ASRResult
TranscriptCallback = Callable[[str, bool], Awaitable[None]]
@dataclass(frozen=True)
class ASRServiceSpec:
"""Resolved runtime configuration for ASR service creation."""
provider: str
sample_rate: int
language: str = "auto"
api_key: Optional[str] = None
api_url: Optional[str] = None
model: Optional[str] = None
interim_interval_ms: int = 500
min_audio_for_interim_ms: int = 300
on_transcript: Optional[TranscriptCallback] = None
class ASRPort(Protocol):
"""Port for speech recognition providers."""
async def connect(self) -> None:
"""Establish connection to ASR provider."""
async def disconnect(self) -> None:
"""Release ASR resources."""
async def send_audio(self, audio: bytes) -> None:
"""Push one PCM audio chunk for recognition."""
async def receive_transcripts(self) -> AsyncIterator[ASRResult]:
"""Stream partial/final recognition results."""
class ASRInterimControl(Protocol):
"""Optional extension for explicit interim transcription control."""
async def start_interim_transcription(self) -> None:
"""Start interim transcription loop if supported."""
async def stop_interim_transcription(self) -> None:
"""Stop interim transcription loop if supported."""
class ASRBufferControl(Protocol):
"""Optional extension for explicit ASR buffer lifecycle control."""
def clear_buffer(self) -> None:
"""Clear provider-side ASR buffer."""
async def get_final_transcription(self) -> str:
"""Return final transcription for the current utterance."""
def get_and_clear_text(self) -> str:
"""Return buffered text and clear internal state."""

View File

@@ -1,7 +1,7 @@
"""Backend integration ports.
"""Control-plane integration ports.
These interfaces define the boundary between engine runtime logic and
backend-side capabilities (config lookup, history persistence, retrieval,
control-plane capabilities (config lookup, history persistence, retrieval,
and tool resource discovery).
"""
@@ -10,14 +10,14 @@ from __future__ import annotations
from typing import Any, Dict, List, Optional, Protocol
class AssistantConfigProvider(Protocol):
class AssistantRuntimeConfigProvider(Protocol):
"""Port for loading trusted assistant runtime configuration."""
async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]:
"""Fetch assistant configuration payload."""
class HistoryWriter(Protocol):
class ConversationHistoryStore(Protocol):
"""Port for persisting call and transcript history."""
async def create_call_record(
@@ -27,7 +27,7 @@ class HistoryWriter(Protocol):
assistant_id: Optional[str],
source: str = "debug",
) -> Optional[str]:
"""Create a call record and return backend call ID."""
"""Create a call record and return control-plane call ID."""
async def add_transcript(
self,
@@ -53,7 +53,7 @@ class HistoryWriter(Protocol):
"""Finalize a call record."""
class KnowledgeSearcher(Protocol):
class KnowledgeRetriever(Protocol):
"""Port for RAG / knowledge retrieval operations."""
async def search_knowledge_context(
@@ -66,19 +66,18 @@ class KnowledgeSearcher(Protocol):
"""Search a knowledge source and return ranked snippets."""
class ToolResourceResolver(Protocol):
class ToolCatalog(Protocol):
"""Port for resolving tool metadata/configuration."""
async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]:
"""Fetch tool resource configuration."""
class BackendGateway(
AssistantConfigProvider,
HistoryWriter,
KnowledgeSearcher,
ToolResourceResolver,
class ControlPlaneGateway(
AssistantRuntimeConfigProvider,
ConversationHistoryStore,
KnowledgeRetriever,
ToolCatalog,
Protocol,
):
"""Composite backend gateway interface used by engine services."""
"""Composite control-plane gateway used by engine services."""

View File

@@ -0,0 +1,67 @@
"""LLM extension port contracts."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Awaitable, Callable, Dict, List, Optional, Protocol
from providers.common.base import LLMMessage, LLMStreamEvent
KnowledgeRetrieverFn = Callable[..., Awaitable[List[Dict[str, Any]]]]
@dataclass(frozen=True)
class LLMServiceSpec:
"""Resolved runtime configuration for LLM service creation."""
provider: str
model: str
api_key: Optional[str] = None
base_url: Optional[str] = None
system_prompt: Optional[str] = None
temperature: float = 0.7
knowledge_config: Dict[str, Any] = field(default_factory=dict)
knowledge_searcher: Optional[KnowledgeRetrieverFn] = None
class LLMPort(Protocol):
"""Port for LLM providers."""
async def connect(self) -> None:
"""Establish connection to LLM provider."""
async def disconnect(self) -> None:
"""Release LLM resources."""
async def generate(
self,
messages: List[LLMMessage],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
) -> str:
"""Generate a complete assistant response."""
async def generate_stream(
self,
messages: List[LLMMessage],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
) -> AsyncIterator[LLMStreamEvent]:
"""Generate streaming assistant response events."""
class LLMCancellable(Protocol):
"""Optional extension for interrupting in-flight LLM generation."""
def cancel(self) -> None:
"""Cancel an in-flight generation request."""
class LLMRuntimeConfigurable(Protocol):
"""Optional extension for runtime config updates."""
def set_knowledge_config(self, config: Optional[Dict[str, Any]]) -> None:
"""Apply runtime knowledge retrieval settings."""
def set_tool_schemas(self, schemas: Optional[List[Dict[str, Any]]]) -> None:
"""Apply runtime tool schemas used for tool calling."""

View File

@@ -0,0 +1,22 @@
"""Factory port for creating runtime ASR/LLM/TTS services."""
from __future__ import annotations
from typing import Protocol
from runtime.ports.asr import ASRPort, ASRServiceSpec
from runtime.ports.llm import LLMPort, LLMServiceSpec
from runtime.ports.tts import TTSPort, TTSServiceSpec
class RealtimeServiceFactory(Protocol):
"""Port for provider-specific service construction."""
def create_llm_service(self, spec: LLMServiceSpec) -> LLMPort:
"""Create an LLM service instance from a resolved spec."""
def create_tts_service(self, spec: TTSServiceSpec) -> TTSPort:
"""Create a TTS service instance from a resolved spec."""
def create_asr_service(self, spec: ASRServiceSpec) -> ASRPort:
"""Create an ASR service instance from a resolved spec."""

View File

@@ -0,0 +1,41 @@
"""TTS extension port contracts."""
from __future__ import annotations
from dataclasses import dataclass
from typing import AsyncIterator, Optional, Protocol
from providers.common.base import TTSChunk
@dataclass(frozen=True)
class TTSServiceSpec:
"""Resolved runtime configuration for TTS service creation."""
provider: str
voice: str
sample_rate: int
speed: float = 1.0
api_key: Optional[str] = None
api_url: Optional[str] = None
model: Optional[str] = None
mode: str = "commit"
class TTSPort(Protocol):
"""Port for speech synthesis providers."""
async def connect(self) -> None:
"""Establish connection to TTS provider."""
async def disconnect(self) -> None:
"""Release TTS resources."""
async def synthesize(self, text: str) -> bytes:
"""Synthesize complete PCM payload for text."""
async def synthesize_stream(self, text: str) -> AsyncIterator[TTSChunk]:
"""Stream synthesized PCM chunks for text."""
async def cancel(self) -> None:
"""Cancel an in-flight synthesis request."""

View File

@@ -0,0 +1 @@
"""Runtime session package."""

View File

@@ -0,0 +1,10 @@
"""Lifecycle helper utilities for runtime sessions."""
from __future__ import annotations
from datetime import datetime, timezone
def utc_now_iso() -> str:
"""Return current UTC timestamp in ISO 8601 format."""
return datetime.now(timezone.utc).isoformat()

View File

@@ -9,15 +9,22 @@ from enum import Enum
from typing import Optional, Dict, Any, List
from loguru import logger
from app.backend_adapters import build_backend_adapter_from_settings
from core.transports import BaseTransport
from core.duplex_pipeline import DuplexPipeline
from core.conversation import ConversationTurn
from core.history_bridge import SessionHistoryBridge
from core.workflow_runner import WorkflowRunner, WorkflowTransition, WorkflowNodeDef, WorkflowEdgeDef
from adapters.control_plane.backend import build_backend_adapter_from_settings
from runtime.transports import BaseTransport
from runtime.ports import (
AssistantRuntimeConfigProvider,
ControlPlaneGateway,
ConversationHistoryStore,
KnowledgeRetriever,
ToolCatalog,
)
from runtime.pipeline.duplex import DuplexPipeline
from runtime.conversation import ConversationTurn
from runtime.history.bridge import SessionHistoryBridge
from workflow.runner import WorkflowRunner, WorkflowTransition, WorkflowNodeDef, WorkflowEdgeDef
from app.config import settings
from services.base import LLMMessage
from models.ws_v1 import (
from providers.common.base import LLMMessage
from protocol.ws_v1.schema import (
parse_client_message,
ev,
SessionStartMessage,
@@ -97,7 +104,11 @@ class Session:
session_id: str,
transport: BaseTransport,
use_duplex: bool = None,
backend_gateway: Optional[Any] = None,
control_plane_gateway: Optional[ControlPlaneGateway] = None,
runtime_config_provider: Optional[AssistantRuntimeConfigProvider] = None,
history_store: Optional[ConversationHistoryStore] = None,
knowledge_retriever: Optional[KnowledgeRetriever] = None,
tool_catalog: Optional[ToolCatalog] = None,
assistant_id: Optional[str] = None,
):
"""
@@ -107,15 +118,24 @@ class Session:
session_id: Unique session identifier
transport: Transport instance for communication
use_duplex: Whether to use duplex pipeline (defaults to settings.duplex_enabled)
control_plane_gateway: Optional composite control-plane dependency
runtime_config_provider: Optional assistant runtime config provider
history_store: Optional conversation history store
knowledge_retriever: Optional knowledge retrieval dependency
tool_catalog: Optional tool resource catalog
"""
self.id = session_id
self.transport = transport
self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled
self.audio_frame_bytes = self._compute_audio_frame_bytes()
self._assistant_id = str(assistant_id or "").strip() or None
self._backend_gateway = backend_gateway or build_backend_adapter_from_settings()
self._control_plane_gateway = control_plane_gateway or build_backend_adapter_from_settings()
self._runtime_config_provider = runtime_config_provider or self._control_plane_gateway
self._history_store = history_store or self._control_plane_gateway
self._knowledge_retriever = knowledge_retriever or self._control_plane_gateway
self._tool_catalog = tool_catalog or self._control_plane_gateway
self._history_bridge = SessionHistoryBridge(
history_writer=self._backend_gateway,
history_writer=self._history_store,
enabled=settings.history_enabled,
queue_max_size=settings.history_queue_max_size,
retry_max_attempts=settings.history_retry_max_attempts,
@@ -128,8 +148,8 @@ class Session:
session_id=session_id,
system_prompt=settings.duplex_system_prompt,
greeting=settings.duplex_greeting,
knowledge_searcher=getattr(self._backend_gateway, "search_knowledge_context", None),
tool_resource_resolver=getattr(self._backend_gateway, "fetch_tool_resource", None),
knowledge_searcher=getattr(self._knowledge_retriever, "search_knowledge_context", None),
tool_resource_resolver=getattr(self._tool_catalog, "fetch_tool_resource", None),
)
# Session state
@@ -935,18 +955,18 @@ class Session:
self,
assistant_id: str,
) -> tuple[Dict[str, Any], Optional[Dict[str, str]]]:
"""Load trusted runtime metadata from backend assistant config."""
"""Load trusted runtime metadata from control-plane assistant config."""
if not assistant_id:
return {}, {
"code": "protocol.assistant_id_required",
"message": "Missing required query parameter assistant_id",
}
provider = getattr(self._backend_gateway, "fetch_assistant_config", None)
provider = getattr(self._runtime_config_provider, "fetch_assistant_config", None)
if not callable(provider):
return {}, {
"code": "assistant.config_unavailable",
"message": "Assistant config backend unavailable",
"message": "Assistant config control plane unavailable",
}
payload = await provider(str(assistant_id).strip())

View File

@@ -0,0 +1,9 @@
"""Metadata helpers extracted from session manager."""
from __future__ import annotations
import re
from typing import Pattern
DYNAMIC_VARIABLE_KEY_RE: Pattern[str] = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$")
DYNAMIC_VARIABLE_PLACEHOLDER_RE: Pattern[str] = re.compile(r"\{\{\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\}\}")

View File

@@ -0,0 +1,12 @@
"""Workflow bridge helpers for runtime session orchestration."""
from __future__ import annotations
from typing import Optional
from workflow.runner import WorkflowRunner
def has_active_workflow(workflow_runner: Optional[WorkflowRunner]) -> bool:
"""Return whether a workflow runner exists and has a current node."""
return bool(workflow_runner and workflow_runner.current_node is not None)

View File

@@ -1,53 +0,0 @@
"""AI Services package.
Provides ASR, LLM, TTS, and Realtime API services for voice conversation.
"""
from services.base import (
ServiceState,
ASRResult,
LLMMessage,
TTSChunk,
BaseASRService,
BaseLLMService,
BaseTTSService,
)
from services.llm import OpenAILLMService, MockLLMService
from services.dashscope_tts import DashScopeTTSService
from services.tts import EdgeTTSService, MockTTSService
from services.asr import BufferedASRService, MockASRService
from services.openai_compatible_asr import OpenAICompatibleASRService, SiliconFlowASRService
from services.openai_compatible_tts import OpenAICompatibleTTSService, SiliconFlowTTSService
from services.streaming_tts_adapter import StreamingTTSAdapter
from services.realtime import RealtimeService, RealtimeConfig, RealtimePipeline
__all__ = [
# Base classes
"ServiceState",
"ASRResult",
"LLMMessage",
"TTSChunk",
"BaseASRService",
"BaseLLMService",
"BaseTTSService",
# LLM
"OpenAILLMService",
"MockLLMService",
# TTS
"DashScopeTTSService",
"EdgeTTSService",
"MockTTSService",
# ASR
"BufferedASRService",
"MockASRService",
"OpenAICompatibleASRService",
"SiliconFlowASRService",
# TTS (SiliconFlow)
"OpenAICompatibleTTSService",
"SiliconFlowTTSService",
"StreamingTTSAdapter",
# Realtime
"RealtimeService",
"RealtimeConfig",
"RealtimePipeline",
]

View File

@@ -1,271 +0,0 @@
"""TTS (Text-to-Speech) Service implementations.
Provides multiple TTS backend options including edge-tts (free)
and placeholder for cloud services.
"""
import os
import io
import asyncio
import struct
from typing import AsyncIterator, Optional
from loguru import logger
from services.base import BaseTTSService, TTSChunk, ServiceState
# Try to import edge-tts
try:
import edge_tts
EDGE_TTS_AVAILABLE = True
except ImportError:
EDGE_TTS_AVAILABLE = False
logger.warning("edge-tts not available - EdgeTTS service will be disabled")
class EdgeTTSService(BaseTTSService):
"""
Microsoft Edge TTS service.
Uses edge-tts library for free, high-quality speech synthesis.
Supports streaming for low-latency playback.
"""
# Voice mapping for common languages
VOICE_MAP = {
"en": "en-US-JennyNeural",
"en-US": "en-US-JennyNeural",
"en-GB": "en-GB-SoniaNeural",
"zh": "zh-CN-XiaoxiaoNeural",
"zh-CN": "zh-CN-XiaoxiaoNeural",
"zh-TW": "zh-TW-HsiaoChenNeural",
"ja": "ja-JP-NanamiNeural",
"ko": "ko-KR-SunHiNeural",
"fr": "fr-FR-DeniseNeural",
"de": "de-DE-KatjaNeural",
"es": "es-ES-ElviraNeural",
}
def __init__(
self,
voice: str = "en-US-JennyNeural",
sample_rate: int = 16000,
speed: float = 1.0
):
"""
Initialize Edge TTS service.
Args:
voice: Voice name (e.g., "en-US-JennyNeural") or language code (e.g., "en")
sample_rate: Target sample rate (will be resampled)
speed: Speech speed multiplier
"""
# Resolve voice from language code if needed
if voice in self.VOICE_MAP:
voice = self.VOICE_MAP[voice]
super().__init__(voice=voice, sample_rate=sample_rate, speed=speed)
self._cancel_event = asyncio.Event()
async def connect(self) -> None:
"""Edge TTS doesn't require explicit connection."""
if not EDGE_TTS_AVAILABLE:
raise RuntimeError("edge-tts package not installed")
self.state = ServiceState.CONNECTED
logger.info(f"Edge TTS service ready: voice={self.voice}")
async def disconnect(self) -> None:
"""Edge TTS doesn't require explicit disconnection."""
self.state = ServiceState.DISCONNECTED
logger.info("Edge TTS service disconnected")
def _get_rate_string(self) -> str:
"""Convert speed to rate string for edge-tts."""
# edge-tts uses percentage format: "+0%", "-10%", "+20%"
percentage = int((self.speed - 1.0) * 100)
if percentage >= 0:
return f"+{percentage}%"
return f"{percentage}%"
async def synthesize(self, text: str) -> bytes:
"""
Synthesize complete audio for text.
Args:
text: Text to synthesize
Returns:
PCM audio data (16-bit, mono, 16kHz)
"""
if not EDGE_TTS_AVAILABLE:
raise RuntimeError("edge-tts not available")
# Collect all chunks
audio_data = b""
async for chunk in self.synthesize_stream(text):
audio_data += chunk.audio
return audio_data
async def synthesize_stream(self, text: str) -> AsyncIterator[TTSChunk]:
"""
Synthesize audio in streaming mode.
Args:
text: Text to synthesize
Yields:
TTSChunk objects with PCM audio
"""
if not EDGE_TTS_AVAILABLE:
raise RuntimeError("edge-tts not available")
self._cancel_event.clear()
try:
communicate = edge_tts.Communicate(
text,
voice=self.voice,
rate=self._get_rate_string()
)
# edge-tts outputs MP3, we need to decode to PCM
# For now, collect MP3 chunks and yield after conversion
mp3_data = b""
async for chunk in communicate.stream():
# Check for cancellation
if self._cancel_event.is_set():
logger.info("TTS synthesis cancelled")
return
if chunk["type"] == "audio":
mp3_data += chunk["data"]
# Convert MP3 to PCM
if mp3_data:
pcm_data = await self._convert_mp3_to_pcm(mp3_data)
if pcm_data:
# Yield in chunks for streaming playback
chunk_size = self.sample_rate * 2 // 10 # 100ms chunks
for i in range(0, len(pcm_data), chunk_size):
if self._cancel_event.is_set():
return
chunk_data = pcm_data[i:i + chunk_size]
yield TTSChunk(
audio=chunk_data,
sample_rate=self.sample_rate,
is_final=(i + chunk_size >= len(pcm_data))
)
except asyncio.CancelledError:
logger.info("TTS synthesis cancelled via asyncio")
raise
except Exception as e:
logger.error(f"TTS synthesis error: {e}")
raise
async def _convert_mp3_to_pcm(self, mp3_data: bytes) -> bytes:
"""
Convert MP3 audio to PCM.
Uses pydub or ffmpeg for conversion.
"""
try:
# Try using pydub (requires ffmpeg)
from pydub import AudioSegment
# Load MP3 from bytes
audio = AudioSegment.from_mp3(io.BytesIO(mp3_data))
# Convert to target format
audio = audio.set_frame_rate(self.sample_rate)
audio = audio.set_channels(1)
audio = audio.set_sample_width(2) # 16-bit
# Export as raw PCM
return audio.raw_data
except ImportError:
logger.warning("pydub not available, trying fallback")
# Fallback: Use subprocess to call ffmpeg directly
return await self._ffmpeg_convert(mp3_data)
except Exception as e:
logger.error(f"Audio conversion error: {e}")
return b""
async def _ffmpeg_convert(self, mp3_data: bytes) -> bytes:
"""Convert MP3 to PCM using ffmpeg subprocess."""
try:
process = await asyncio.create_subprocess_exec(
"ffmpeg",
"-i", "pipe:0",
"-f", "s16le",
"-acodec", "pcm_s16le",
"-ar", str(self.sample_rate),
"-ac", "1",
"pipe:1",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL
)
stdout, _ = await process.communicate(input=mp3_data)
return stdout
except Exception as e:
logger.error(f"ffmpeg conversion error: {e}")
return b""
async def cancel(self) -> None:
"""Cancel ongoing synthesis."""
self._cancel_event.set()
class MockTTSService(BaseTTSService):
"""
Mock TTS service for testing without actual synthesis.
Generates silence or simple tones.
"""
def __init__(
self,
voice: str = "mock",
sample_rate: int = 16000,
speed: float = 1.0
):
super().__init__(voice=voice, sample_rate=sample_rate, speed=speed)
async def connect(self) -> None:
self.state = ServiceState.CONNECTED
logger.info("Mock TTS service connected")
async def disconnect(self) -> None:
self.state = ServiceState.DISCONNECTED
logger.info("Mock TTS service disconnected")
async def synthesize(self, text: str) -> bytes:
"""Generate silence based on text length."""
# Approximate: 100ms per word
word_count = len(text.split())
duration_ms = word_count * 100
samples = int(self.sample_rate * duration_ms / 1000)
# Generate silence (zeros)
return bytes(samples * 2) # 16-bit = 2 bytes per sample
async def synthesize_stream(self, text: str) -> AsyncIterator[TTSChunk]:
"""Generate silence chunks."""
audio = await self.synthesize(text)
# Yield in 100ms chunks
chunk_size = self.sample_rate * 2 // 10
for i in range(0, len(audio), chunk_size):
chunk_data = audio[i:i + chunk_size]
yield TTSChunk(
audio=chunk_data,
sample_rate=self.sample_rate,
is_final=(i + chunk_size >= len(audio))
)
await asyncio.sleep(0.05) # Simulate processing time

View File

@@ -1,7 +1,7 @@
import aiohttp
import pytest
from app.backend_adapters import (
from adapters.control_plane.backend import (
AssistantConfigSourceAdapter,
LocalYamlAssistantConfigAdapter,
build_backend_adapter,
@@ -120,7 +120,7 @@ async def test_http_backend_adapter_create_call_record_posts_expected_payload(mo
},
)
monkeypatch.setattr("app.backend_adapters.aiohttp.ClientSession", _FakeClientSession)
monkeypatch.setattr("adapters.control_plane.backend.aiohttp.ClientSession", _FakeClientSession)
config_dir = tmp_path / "assistants"
config_dir.mkdir(parents=True, exist_ok=True)
@@ -198,7 +198,7 @@ async def test_with_backend_url_uses_backend_for_assistant_config(monkeypatch, t
_ = (url, json)
return _FakeResponse(status=200, payload={"id": "call_1"})
monkeypatch.setattr("app.backend_adapters.aiohttp.ClientSession", _FakeClientSession)
monkeypatch.setattr("adapters.control_plane.backend.aiohttp.ClientSession", _FakeClientSession)
config_dir = tmp_path / "assistants"
config_dir.mkdir(parents=True, exist_ok=True)
@@ -234,7 +234,7 @@ async def test_backend_mode_disabled_uses_local_assistant_config_even_with_url(m
_ = timeout
raise AssertionError("HTTP client should not be created when backend_mode=disabled")
monkeypatch.setattr("app.backend_adapters.aiohttp.ClientSession", _FailIfCalledClientSession)
monkeypatch.setattr("adapters.control_plane.backend.aiohttp.ClientSession", _FailIfCalledClientSession)
config_dir = tmp_path / "assistants"
config_dir.mkdir(parents=True, exist_ok=True)

View File

@@ -1,4 +1,4 @@
from core.session import Session
from runtime.session.manager import Session
def _session() -> Session:

View File

@@ -3,7 +3,7 @@ import time
import pytest
from core.history_bridge import SessionHistoryBridge
from runtime.history.bridge import SessionHistoryBridge
class _FakeHistoryWriter:

View File

@@ -4,10 +4,10 @@ from typing import Any, Dict, List
import pytest
from core.conversation import ConversationState
from core.duplex_pipeline import DuplexPipeline
from models.ws_v1 import OutputAudioPlayedMessage, ToolCallResultsMessage, parse_client_message
from services.base import LLMStreamEvent
from runtime.conversation import ConversationState
from runtime.pipeline.duplex import DuplexPipeline
from protocol.ws_v1.schema import OutputAudioPlayedMessage, ToolCallResultsMessage, parse_client_message
from providers.common.base import LLMStreamEvent
class _DummySileroVAD:
@@ -86,9 +86,9 @@ class _CaptureGenerateLLM:
def _build_pipeline(monkeypatch, llm_rounds: List[List[LLMStreamEvent]]) -> tuple[DuplexPipeline, List[Dict[str, Any]]]:
monkeypatch.setattr("core.duplex_pipeline.SileroVAD", _DummySileroVAD)
monkeypatch.setattr("core.duplex_pipeline.VADProcessor", _DummyVADProcessor)
monkeypatch.setattr("core.duplex_pipeline.EouDetector", _DummyEouDetector)
monkeypatch.setattr("runtime.pipeline.duplex.SileroVAD", _DummySileroVAD)
monkeypatch.setattr("runtime.pipeline.duplex.VADProcessor", _DummyVADProcessor)
monkeypatch.setattr("runtime.pipeline.duplex.EouDetector", _DummyEouDetector)
pipeline = DuplexPipeline(
transport=_FakeTransport(),
@@ -112,7 +112,7 @@ def _build_pipeline(monkeypatch, llm_rounds: List[List[LLMStreamEvent]]) -> tupl
def test_pipeline_uses_default_tools_from_settings(monkeypatch):
monkeypatch.setattr(
"core.duplex_pipeline.settings.tools",
"runtime.pipeline.duplex.settings.tools",
[
"current_time",
"calculator",
@@ -141,7 +141,7 @@ def test_pipeline_uses_default_tools_from_settings(monkeypatch):
def test_pipeline_exposes_unknown_string_tools_with_fallback_schema(monkeypatch):
monkeypatch.setattr("core.duplex_pipeline.settings.tools", ["custom_system_cmd"])
monkeypatch.setattr("runtime.pipeline.duplex.settings.tools", ["custom_system_cmd"])
pipeline, _events = _build_pipeline(monkeypatch, [[LLMStreamEvent(type="done")]])
schemas = pipeline._resolved_tool_schemas()
@@ -151,7 +151,7 @@ def test_pipeline_exposes_unknown_string_tools_with_fallback_schema(monkeypatch)
def test_pipeline_assigns_default_client_executor_for_system_string_tools(monkeypatch):
monkeypatch.setattr("core.duplex_pipeline.settings.tools", ["increase_volume"])
monkeypatch.setattr("runtime.pipeline.duplex.settings.tools", ["increase_volume"])
pipeline, _events = _build_pipeline(monkeypatch, [[LLMStreamEvent(type="done")]])
tool_call = {
@@ -221,9 +221,9 @@ async def test_pipeline_applies_default_args_to_tool_call(monkeypatch):
@pytest.mark.asyncio
async def test_generated_opener_prompt_uses_system_prompt_only(monkeypatch):
monkeypatch.setattr("core.duplex_pipeline.SileroVAD", _DummySileroVAD)
monkeypatch.setattr("core.duplex_pipeline.VADProcessor", _DummyVADProcessor)
monkeypatch.setattr("core.duplex_pipeline.EouDetector", _DummyEouDetector)
monkeypatch.setattr("runtime.pipeline.duplex.SileroVAD", _DummySileroVAD)
monkeypatch.setattr("runtime.pipeline.duplex.VADProcessor", _DummyVADProcessor)
monkeypatch.setattr("runtime.pipeline.duplex.EouDetector", _DummyEouDetector)
llm = _CaptureGenerateLLM("你好")
pipeline = DuplexPipeline(
@@ -662,7 +662,7 @@ async def test_server_tool_timeout_emits_504_and_continues(monkeypatch):
"status": {"code": 200, "message": "ok"},
}
monkeypatch.setattr("core.duplex_pipeline.execute_server_tool", _slow_execute)
monkeypatch.setattr("runtime.pipeline.duplex.execute_server_tool", _slow_execute)
pipeline, events = _build_pipeline(
monkeypatch,

View File

@@ -1,6 +1,6 @@
import pytest
from core.tool_executor import execute_server_tool
from tools.executor import execute_server_tool
@pytest.mark.asyncio
@@ -38,7 +38,7 @@ async def test_current_time_uses_local_system_clock(monkeypatch):
async def _should_not_be_called(_tool_id):
raise AssertionError("fetch_tool_resource should not be called for current_time")
monkeypatch.setattr("core.tool_executor.fetch_tool_resource", _should_not_be_called)
monkeypatch.setattr("tools.executor.fetch_tool_resource", _should_not_be_called)
result = await execute_server_tool(
{

View File

@@ -1,7 +1,7 @@
import pytest
from core.session import Session, WsSessionState
from models.ws_v1 import OutputAudioPlayedMessage, SessionStartMessage, parse_client_message
from runtime.session.manager import Session, WsSessionState
from protocol.ws_v1.schema import OutputAudioPlayedMessage, SessionStartMessage, parse_client_message
def _session() -> Session:
@@ -139,7 +139,7 @@ async def test_load_server_runtime_metadata_returns_not_found_error():
_ = assistant_id
return {"__error_code": "assistant.not_found"}
session._backend_gateway = _Gateway()
session._runtime_config_provider = _Gateway()
runtime, error = await session._load_server_runtime_metadata("assistant_demo")
assert runtime == {}
assert error is not None
@@ -155,7 +155,7 @@ async def test_load_server_runtime_metadata_returns_config_unavailable_error():
_ = assistant_id
return None
session._backend_gateway = _Gateway()
session._runtime_config_provider = _Gateway()
runtime, error = await session._load_server_runtime_metadata("assistant_demo")
assert runtime == {}
assert error is not None
@@ -194,7 +194,7 @@ async def test_handle_session_start_requires_assistant_id_and_closes_transport()
@pytest.mark.asyncio
async def test_handle_session_start_applies_whitelisted_overrides_and_ignores_workflow(monkeypatch):
monkeypatch.setattr("core.session.settings.ws_emit_config_resolved", False)
monkeypatch.setattr("runtime.session.manager.settings.ws_emit_config_resolved", False)
session = Session.__new__(Session)
session.id = "sess_start_ok"
@@ -289,9 +289,9 @@ async def test_handle_session_start_applies_whitelisted_overrides_and_ignores_wo
@pytest.mark.asyncio
async def test_handle_session_start_emits_config_resolved_when_enabled(monkeypatch):
monkeypatch.setattr("core.session.settings.ws_emit_config_resolved", True)
monkeypatch.setattr("core.session.settings.ws_protocol_version", "v1-custom")
monkeypatch.setattr("core.session.settings.default_codec", "pcmu")
monkeypatch.setattr("runtime.session.manager.settings.ws_emit_config_resolved", True)
monkeypatch.setattr("runtime.session.manager.settings.ws_protocol_version", "v1-custom")
monkeypatch.setattr("runtime.session.manager.settings.default_codec", "pcmu")
session = Session.__new__(Session)
session.id = "sess_start_emit_config"
@@ -385,8 +385,8 @@ async def test_handle_session_start_emits_config_resolved_when_enabled(monkeypat
@pytest.mark.asyncio
async def test_handle_audio_uses_chunk_size_for_frame_validation(monkeypatch):
monkeypatch.setattr("core.session.settings.sample_rate", 16000)
monkeypatch.setattr("core.session.settings.chunk_size_ms", 10)
monkeypatch.setattr("runtime.session.manager.settings.sample_rate", 16000)
monkeypatch.setattr("runtime.session.manager.settings.chunk_size_ms", 10)
session = Session.__new__(Session)
session.id = "sess_chunk_frame"

1
engine/tools/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Tools package."""

View File

@@ -8,7 +8,7 @@ from typing import Any, Awaitable, Callable, Dict, Optional
import aiohttp
from app.backend_adapters import build_backend_adapter_from_settings
from adapters.control_plane.backend import build_backend_adapter_from_settings
ToolResourceFetcher = Callable[[str], Awaitable[Optional[Dict[str, Any]]]]

View File

@@ -0,0 +1 @@
"""Workflow package."""