# Rate Limiting Understanding and handling rate limits in the FastGPT API. ## Understanding Rate Limits FastGPT API may enforce rate limits to: - Prevent API abuse - Ensure fair resource allocation - Maintain system stability When you exceed the rate limit, you'll receive a `429 Too Many Requests` response. ## RateLimitError The SDK raises `RateLimitError` when rate limits are exceeded: ```python from fastgpt_client import ChatClient from fastgpt_client.exceptions import RateLimitError try: response = client.create_chat_completion( messages=[{"role": "user", "content": "Hello"}] ) except RateLimitError as e: print(f"Rate limit exceeded!") print(f"Status code: {e.status_code}") # 429 print(f"Retry after: {e.retry_after}") # Suggested wait time ``` ## Handling Rate Limits ### 1. Simple Retry with Delay ```python import time from fastgpt_client.exceptions import RateLimitError def chat_with_retry(client, messages, max_retries=3): """Retry on rate limit with fixed delay.""" for attempt in range(max_retries): try: response = client.create_chat_completion( messages=messages, stream=False ) response.raise_for_status() return response.json() except RateLimitError as e: if attempt < max_retries - 1: # Use Retry-After header or default to 5 seconds wait_time = int(e.retry_after) if e.retry_after else 5 print(f"Rate limited. Waiting {wait_time} seconds...") time.sleep(wait_time) else: print("Max retries exceeded") raise ``` ### 2. Exponential Backoff ```python import time def chat_with_backoff(client, messages, max_retries=5): """Retry with exponential backoff.""" base_delay = 1 # Start with 1 second for attempt in range(max_retries): try: response = client.create_chat_completion( messages=messages, stream=False ) response.raise_for_status() return response.json() except RateLimitError as e: if attempt < max_retries - 1: # Exponential backoff with jitter delay = base_delay * (2 ** attempt) # Add jitter to avoid thundering herd import random jitter = random.uniform(0, 0.5 * delay) wait_time = delay + jitter print(f"Rate limited. Waiting {wait_time:.1f} seconds...") time.sleep(wait_time) else: raise ``` ### 3. Async Retry with Backoff ```python import asyncio async def async_chat_with_retry(client, messages, max_retries=5): """Async retry with exponential backoff.""" base_delay = 1 for attempt in range(max_retries): try: response = await client.create_chat_completion( messages=messages, stream=False ) response.raise_for_status() return response.json() except RateLimitError as e: if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) print(f"Rate limited. Waiting {delay} seconds...") await asyncio.sleep(delay) else: raise ``` ### 4. Rate Limiter Class ```python import time from collections import deque from threading import Lock class RateLimiter: """Token bucket rate limiter.""" def __init__(self, rate: int, per: float = 60.0): """ Args: rate: Number of requests allowed per: Time period in seconds """ self.rate = rate self.per = per self.allowance = rate self.last_check = time.time() self.lock = Lock() def acquire(self, block: bool = True, timeout: float = None) -> bool: """Acquire a token from the bucket.""" with self.lock: current = time.time() time_passed = current - self.last_check self.last_check = current # Refill bucket self.allowance += time_passed * (self.rate / self.per) if self.allowance > self.rate: self.allowance = self.rate if self.allowance < 1.0: if not block: return False # Calculate wait time sleep_time = (1.0 - self.allowance) * (self.per / self.rate) if timeout is not None and sleep_time > timeout: return False time.sleep(sleep_time) self.allowance = 0.0 else: self.allowance -= 1.0 return True # Usage rate_limiter = RateLimiter(rate=10, per=60) # 10 requests per minute for i in range(15): if rate_limiter.acquire(): response = client.create_chat_completion( messages=[{"role": "user", "content": f"Message {i}"}] ) print(f"Sent message {i}") else: print(f"Rate limited, skipping message {i}") ``` ### 5. Decorator for Rate Limiting ```python import time import functools from fastgpt_client.exceptions import RateLimitError def rate_limit_retry(max_retries=3, base_delay=1): """Decorator to handle rate limiting with retries.""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except RateLimitError as e: if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) wait_time = int(e.retry_after) if e.retry_after else delay print(f"Rate limited. Waiting {wait_time} seconds...") time.sleep(wait_time) else: raise return wrapper return decorator # Usage @rate_limit_retry(max_retries=3, base_delay=2) def send_message(client, message: str): response = client.create_chat_completion( messages=[{"role": "user", "content": message}], stream=False ) response.raise_for_status() return response.json() ``` ## Monitoring Rate Limits ```python import time from collections import defaultdict class RequestMonitor: """Monitor API request rates.""" def __init__(self, window_seconds=60): self.window = window_seconds self.requests = defaultdict(list) self.lock = Lock() def record_request(self, endpoint: str): """Record an API request.""" with self.lock: now = time.time() self.requests[endpoint].append(now) # Remove old requests outside the window cutoff = now - self.window self.requests[endpoint] = [ t for t in self.requests[endpoint] if t > cutoff ] def get_rate(self, endpoint: str) -> float: """Get requests per second for an endpoint.""" with self.lock: recent = self.requests[endpoint] if not recent: return 0.0 return len(recent) / self.window def is_rate_limited(self, endpoint: str, limit: int) -> bool: """Check if endpoint is rate limited.""" with self.lock: cutoff = time.time() - self.window recent = [t for t in self.requests[endpoint] if t > cutoff] return len(recent) >= limit # Usage monitor = RequestMonitor(window_seconds=60) def make_request(client, messages): endpoint = "/api/v1/chat/completions" # Check if we're rate limited if monitor.is_rate_limited(endpoint, limit=100): print("Approaching rate limit, slowing down...") time.sleep(1) monitor.record_request(endpoint) response = client.create_chat_completion(messages=messages) return response ``` ## Best Practices 1. **Implement backoff** - Use exponential backoff for retries 2. **Respect Retry-After** - Use the `retry_after` header when available 3. **Monitor usage** - Track request rates to avoid hitting limits 4. **Queue requests** - For batch operations, use rate limiting 5. **Handle gracefully** - Show user-friendly messages when rate limited 6. **Use async** - Better resource utilization with concurrent requests ## See Also - [Error Handling](error_handling.md) - Comprehensive error handling guide - [Exceptions Reference](../api/exceptions.md) - Exception types and attributes