Smart Retry Strategy
Copy
import time
import random
from typing import Callable, Any
import requests
class SmartRetryHandler:
def __init__(self, max_retries=3, base_delay=1.0):
self.max_retries = max_retries
self.base_delay = base_delay
def exponential_backoff_with_jitter(self, attempt):
"""Exponential backoff + random jitter"""
delay = self.base_delay * (2 ** attempt)
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
def should_retry(self, exception, attempt):
"""Determine if retry should be attempted"""
# Network errors, timeouts, server errors -> retry
retryable_errors = [
"timeout", "connection", "502", "503", "504", "429"
]
error_str = str(exception).lower()
return any(error in error_str for error in retryable_errors)
def retry(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function call with retry"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt == self.max_retries:
break
if not self.should_retry(e, attempt):
break
delay = self.exponential_backoff_with_jitter(attempt)
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {delay:.2f} seconds...")
time.sleep(delay)
raise last_exception
# Usage example
retry_handler = SmartRetryHandler(max_retries=3, base_delay=1.0)
def api_call():
# Simulate API call that might fail
response = requests.post(
'https://gateway.iotex.ai/v1/chat/completions',
headers={'Authorization': 'Bearer your-api-key'},
json={
'model': 'gemini-2.5-flash',
'messages': [{'role': 'user', 'content': 'Hello'}]
},
timeout=10
)
response.raise_for_status()
return response.json()
try:
result = retry_handler.retry(api_call)
print(f"Success: {result}")
except Exception as e:
print(f"Final failure: {e}")
Comprehensive Error Classification
Copy
import requests
from enum import Enum
import logging
class ErrorType(Enum):
AUTHENTICATION = "authentication"
RATE_LIMIT = "rate_limit"
SERVER_ERROR = "server_error"
NETWORK_ERROR = "network_error"
TIMEOUT = "timeout"
INVALID_REQUEST = "invalid_request"
QUOTA_EXCEEDED = "quota_exceeded"
MODEL_UNAVAILABLE = "model_unavailable"
class APIErrorHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)
def classify_error(self, response=None, exception=None):
"""Classify the type of error encountered"""
if response:
status_code = response.status_code
if status_code == 401:
return ErrorType.AUTHENTICATION
elif status_code == 429:
return ErrorType.RATE_LIMIT
elif status_code == 400:
return ErrorType.INVALID_REQUEST
elif status_code == 402:
return ErrorType.QUOTA_EXCEEDED
elif status_code == 503:
return ErrorType.MODEL_UNAVAILABLE
elif status_code >= 500:
return ErrorType.SERVER_ERROR
if exception:
error_str = str(exception).lower()
if "timeout" in error_str:
return ErrorType.TIMEOUT
elif "connection" in error_str:
return ErrorType.NETWORK_ERROR
return ErrorType.SERVER_ERROR
def get_retry_strategy(self, error_type):
"""Get appropriate retry strategy based on error type"""
strategies = {
ErrorType.AUTHENTICATION: {"retry": False, "delay": 0},
ErrorType.RATE_LIMIT: {"retry": True, "delay": 60, "max_retries": 5},
ErrorType.SERVER_ERROR: {"retry": True, "delay": 2, "max_retries": 3},
ErrorType.NETWORK_ERROR: {"retry": True, "delay": 1, "max_retries": 3},
ErrorType.TIMEOUT: {"retry": True, "delay": 5, "max_retries": 2},
ErrorType.INVALID_REQUEST: {"retry": False, "delay": 0},
ErrorType.QUOTA_EXCEEDED: {"retry": False, "delay": 0},
ErrorType.MODEL_UNAVAILABLE: {"retry": True, "delay": 30, "max_retries": 2}
}
return strategies.get(error_type, {"retry": True, "delay": 2, "max_retries": 3})
def handle_error(self, response=None, exception=None):
"""Handle error with appropriate strategy"""
error_type = self.classify_error(response, exception)
strategy = self.get_retry_strategy(error_type)
self.logger.error(f"Error type: {error_type.value}, Strategy: {strategy}")
return {
"error_type": error_type,
"should_retry": strategy["retry"],
"retry_delay": strategy["delay"],
"max_retries": strategy.get("max_retries", 3)
}
Advanced Retry Mechanisms
Circuit Breaker Pattern
Copy
import time
from enum import Enum
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function call through circuit breaker"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN - service unavailable")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self):
"""Check if we should try to reset the circuit"""
return (time.time() - self.last_failure_time) >= self.recovery_timeout
def _on_success(self):
"""Handle successful call"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage with AI API
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
def make_ai_request():
response = requests.post(
'https://gateway.iotex.ai/v1/chat/completions',
headers={'Authorization': 'Bearer your-api-key'},
json={'model': 'gemini-2.5-flash', 'messages': [{'role': 'user', 'content': 'Hello'}]},
timeout=10
)
response.raise_for_status()
return response.json()
try:
result = circuit_breaker.call(make_ai_request)
print("Request successful")
except Exception as e:
print(f"Request failed: {e}")
Adaptive Retry with Success Rate Tracking
Copy
import time
from collections import deque
from dataclasses import dataclass
from typing import Optional
@dataclass
class RequestResult:
timestamp: float
success: bool
response_time: Optional[float] = None
error_type: Optional[str] = None
class AdaptiveRetryHandler:
def __init__(self, window_size=100, success_threshold=0.8):
self.window_size = window_size
self.success_threshold = success_threshold
self.results = deque(maxlen=window_size)
def add_result(self, success: bool, response_time: float = None, error_type: str = None):
"""Record the result of a request"""
result = RequestResult(
timestamp=time.time(),
success=success,
response_time=response_time,
error_type=error_type
)
self.results.append(result)
def get_success_rate(self, time_window: float = 300): # 5 minutes
"""Calculate success rate within time window"""
current_time = time.time()
recent_results = [
r for r in self.results
if current_time - r.timestamp <= time_window
]
if not recent_results:
return 1.0 # Assume good if no recent data
successful = sum(1 for r in recent_results if r.success)
return successful / len(recent_results)
def should_retry(self, attempt: int, max_retries: int = 3) -> bool:
"""Determine if retry should be attempted based on current success rate"""
if attempt >= max_retries:
return False
success_rate = self.get_success_rate()
# More aggressive retries when success rate is high
if success_rate >= self.success_threshold:
return True
# Conservative retries when success rate is low
return attempt < (max_retries // 2)
def get_retry_delay(self, attempt: int) -> float:
"""Calculate adaptive retry delay based on current conditions"""
base_delay = 2 ** attempt # Exponential backoff
success_rate = self.get_success_rate()
# Longer delays when success rate is low
if success_rate < 0.5:
return base_delay * 2
elif success_rate < self.success_threshold:
return base_delay * 1.5
else:
return base_delay
def execute_with_retry(self, func, *args, **kwargs):
"""Execute function with adaptive retry strategy"""
max_retries = 3
for attempt in range(max_retries + 1):
start_time = time.time()
try:
result = func(*args, **kwargs)
response_time = time.time() - start_time
self.add_result(success=True, response_time=response_time)
return result
except Exception as e:
response_time = time.time() - start_time
error_type = type(e).__name__
self.add_result(success=False, response_time=response_time, error_type=error_type)
if not self.should_retry(attempt, max_retries):
raise e
delay = self.get_retry_delay(attempt)
print(f"Attempt {attempt + 1} failed, retrying in {delay:.1f}s (success rate: {self.get_success_rate():.2%})")
time.sleep(delay)
raise Exception("Max retries exceeded")
Error Recovery Strategies
Graceful Degradation
Copy
class GracefulAIClient:
def __init__(self, primary_api_key, fallback_api_key=None):
self.primary_api_key = primary_api_key
self.fallback_api_key = fallback_api_key
self.retry_handler = SmartRetryHandler()
self.error_handler = APIErrorHandler()
def chat_with_fallback(self, messages, model="gemini-2.5-flash"):
"""Chat with automatic fallback strategies"""
# Strategy 1: Try primary API with retry
try:
return self._make_request(self.primary_api_key, messages, model)
except Exception as e:
print(f"Primary API failed: {e}")
# Strategy 2: Try fallback API key if available
if self.fallback_api_key:
try:
print("Trying fallback API key...")
return self._make_request(self.fallback_api_key, messages, model)
except Exception as e2:
print(f"Fallback API also failed: {e2}")
# Strategy 3: Try simpler model
if model != "gemini-2.5-flash":
try:
print("Trying simpler model...")
return self._make_request(self.primary_api_key, messages, "gemini-2.5-flash")
except Exception as e3:
print(f"Simpler model also failed: {e3}")
# Strategy 4: Provide cached or default response
return self._get_fallback_response(messages)
def _make_request(self, api_key, messages, model):
"""Make API request with error handling"""
def api_call():
response = requests.post(
'https://gateway.iotex.ai/v1/chat/completions',
headers={'Authorization': f'Bearer {api_key}'},
json={'model': model, 'messages': messages},
timeout=30
)
response.raise_for_status()
return response.json()
return self.retry_handler.retry(api_call)
def _get_fallback_response(self, messages):
"""Provide fallback response when all else fails"""
return {
"choices": [{
"message": {
"role": "assistant",
"content": "I apologize, but I'm experiencing technical difficulties and cannot process your request at the moment. Please try again later."
}
}],
"usage": {"total_tokens": 0}
}
Monitoring and Logging
Copy
import logging
import json
from datetime import datetime
class APIMonitor:
def __init__(self, log_file="api_errors.log"):
self.logger = logging.getLogger("api_monitor")
handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_request(self, request_data, response_data=None, error=None, response_time=None):
"""Log API request details"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"request": request_data,
"response_time": response_time,
"success": error is None
}
if response_data:
log_entry["response"] = response_data
if error:
log_entry["error"] = {
"type": type(error).__name__,
"message": str(error)
}
self.logger.error(json.dumps(log_entry))
else:
self.logger.info(json.dumps(log_entry))
def log_retry_attempt(self, attempt, max_attempts, delay, error):
"""Log retry attempts"""
log_entry = {
"event": "retry_attempt",
"attempt": attempt,
"max_attempts": max_attempts,
"delay": delay,
"error": str(error)
}
self.logger.warning(json.dumps(log_entry))
Best Practices for Error Handling
- Classify Errors Properly: Different error types require different handling strategies
- Implement Exponential Backoff: Avoid overwhelming the server during outages
- Use Circuit Breakers: Fail fast when the service is consistently unavailable
- Plan for Graceful Degradation: Provide fallback options when possible
- Monitor and Alert: Track error rates and set up appropriate alerts
- Log Comprehensively: Capture enough detail for debugging without logging sensitive data
- Test Error Scenarios: Regularly test your error handling in staging environments
- Set Reasonable Timeouts: Balance between allowing enough time and failing fast
- Respect Rate Limits: Implement proper rate limiting to avoid 429 errors
- Document Error Responses: Keep clear documentation of how different errors are handled