Skip to main content
Robust error handling is essential for building reliable applications that interact with AI APIs. This guide covers comprehensive strategies for handling various error scenarios and implementing intelligent retry mechanisms.

Smart Retry Strategy

import time
import random
from typing import Callable, Any
import requests

class SmartRetryHandler:
    def __init__(self, max_retries=3, base_delay=1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    def exponential_backoff_with_jitter(self, attempt):
        """Exponential backoff + random jitter"""
        delay = self.base_delay * (2 ** attempt)
        jitter = random.uniform(0, delay * 0.1)
        return delay + jitter
    
    def should_retry(self, exception, attempt):
        """Determine if retry should be attempted"""
        # Network errors, timeouts, server errors -> retry
        retryable_errors = [
            "timeout", "connection", "502", "503", "504", "429"
        ]
        
        error_str = str(exception).lower()
        return any(error in error_str for error in retryable_errors)
    
    def retry(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function call with retry"""
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return func(*args, **kwargs)
                
            except Exception as e:
                last_exception = e
                
                if attempt == self.max_retries:
                    break
                
                if not self.should_retry(e, attempt):
                    break
                
                delay = self.exponential_backoff_with_jitter(attempt)
                print(f"Attempt {attempt + 1} failed: {e}")
                print(f"Retrying in {delay:.2f} seconds...")
                time.sleep(delay)
        
        raise last_exception

# Usage example
retry_handler = SmartRetryHandler(max_retries=3, base_delay=1.0)

def api_call():
    # Simulate API call that might fail
    response = requests.post(
        'https://gateway.iotex.ai/v1/chat/completions',
        headers={'Authorization': 'Bearer your-api-key'},
        json={
            'model': 'gemini-2.5-flash',
            'messages': [{'role': 'user', 'content': 'Hello'}]
        },
        timeout=10
    )
    response.raise_for_status()
    return response.json()

try:
    result = retry_handler.retry(api_call)
    print(f"Success: {result}")
except Exception as e:
    print(f"Final failure: {e}")

Comprehensive Error Classification

import requests
from enum import Enum
import logging

class ErrorType(Enum):
    AUTHENTICATION = "authentication"
    RATE_LIMIT = "rate_limit"
    SERVER_ERROR = "server_error"
    NETWORK_ERROR = "network_error"
    TIMEOUT = "timeout"
    INVALID_REQUEST = "invalid_request"
    QUOTA_EXCEEDED = "quota_exceeded"
    MODEL_UNAVAILABLE = "model_unavailable"

class APIErrorHandler:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
    def classify_error(self, response=None, exception=None):
        """Classify the type of error encountered"""
        if response:
            status_code = response.status_code
            
            if status_code == 401:
                return ErrorType.AUTHENTICATION
            elif status_code == 429:
                return ErrorType.RATE_LIMIT
            elif status_code == 400:
                return ErrorType.INVALID_REQUEST
            elif status_code == 402:
                return ErrorType.QUOTA_EXCEEDED
            elif status_code == 503:
                return ErrorType.MODEL_UNAVAILABLE
            elif status_code >= 500:
                return ErrorType.SERVER_ERROR
        
        if exception:
            error_str = str(exception).lower()
            if "timeout" in error_str:
                return ErrorType.TIMEOUT
            elif "connection" in error_str:
                return ErrorType.NETWORK_ERROR
        
        return ErrorType.SERVER_ERROR
    
    def get_retry_strategy(self, error_type):
        """Get appropriate retry strategy based on error type"""
        strategies = {
            ErrorType.AUTHENTICATION: {"retry": False, "delay": 0},
            ErrorType.RATE_LIMIT: {"retry": True, "delay": 60, "max_retries": 5},
            ErrorType.SERVER_ERROR: {"retry": True, "delay": 2, "max_retries": 3},
            ErrorType.NETWORK_ERROR: {"retry": True, "delay": 1, "max_retries": 3},
            ErrorType.TIMEOUT: {"retry": True, "delay": 5, "max_retries": 2},
            ErrorType.INVALID_REQUEST: {"retry": False, "delay": 0},
            ErrorType.QUOTA_EXCEEDED: {"retry": False, "delay": 0},
            ErrorType.MODEL_UNAVAILABLE: {"retry": True, "delay": 30, "max_retries": 2}
        }
        return strategies.get(error_type, {"retry": True, "delay": 2, "max_retries": 3})
    
    def handle_error(self, response=None, exception=None):
        """Handle error with appropriate strategy"""
        error_type = self.classify_error(response, exception)
        strategy = self.get_retry_strategy(error_type)
        
        self.logger.error(f"Error type: {error_type.value}, Strategy: {strategy}")
        
        return {
            "error_type": error_type,
            "should_retry": strategy["retry"],
            "retry_delay": strategy["delay"],
            "max_retries": strategy.get("max_retries", 3)
        }

Advanced Retry Mechanisms

Circuit Breaker Pattern

import time
from enum import Enum
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function call through circuit breaker"""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN - service unavailable")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
            
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _should_attempt_reset(self):
        """Check if we should try to reset the circuit"""
        return (time.time() - self.last_failure_time) >= self.recovery_timeout
    
    def _on_success(self):
        """Handle successful call"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Usage with AI API
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

def make_ai_request():
    response = requests.post(
        'https://gateway.iotex.ai/v1/chat/completions',
        headers={'Authorization': 'Bearer your-api-key'},
        json={'model': 'gemini-2.5-flash', 'messages': [{'role': 'user', 'content': 'Hello'}]},
        timeout=10
    )
    response.raise_for_status()
    return response.json()

try:
    result = circuit_breaker.call(make_ai_request)
    print("Request successful")
except Exception as e:
    print(f"Request failed: {e}")

Adaptive Retry with Success Rate Tracking

import time
from collections import deque
from dataclasses import dataclass
from typing import Optional

@dataclass
class RequestResult:
    timestamp: float
    success: bool
    response_time: Optional[float] = None
    error_type: Optional[str] = None

class AdaptiveRetryHandler:
    def __init__(self, window_size=100, success_threshold=0.8):
        self.window_size = window_size
        self.success_threshold = success_threshold
        self.results = deque(maxlen=window_size)
        
    def add_result(self, success: bool, response_time: float = None, error_type: str = None):
        """Record the result of a request"""
        result = RequestResult(
            timestamp=time.time(),
            success=success,
            response_time=response_time,
            error_type=error_type
        )
        self.results.append(result)
    
    def get_success_rate(self, time_window: float = 300):  # 5 minutes
        """Calculate success rate within time window"""
        current_time = time.time()
        recent_results = [
            r for r in self.results
            if current_time - r.timestamp <= time_window
        ]
        
        if not recent_results:
            return 1.0  # Assume good if no recent data
        
        successful = sum(1 for r in recent_results if r.success)
        return successful / len(recent_results)
    
    def should_retry(self, attempt: int, max_retries: int = 3) -> bool:
        """Determine if retry should be attempted based on current success rate"""
        if attempt >= max_retries:
            return False
        
        success_rate = self.get_success_rate()
        
        # More aggressive retries when success rate is high
        if success_rate >= self.success_threshold:
            return True
        
        # Conservative retries when success rate is low
        return attempt < (max_retries // 2)
    
    def get_retry_delay(self, attempt: int) -> float:
        """Calculate adaptive retry delay based on current conditions"""
        base_delay = 2 ** attempt  # Exponential backoff
        success_rate = self.get_success_rate()
        
        # Longer delays when success rate is low
        if success_rate < 0.5:
            return base_delay * 2
        elif success_rate < self.success_threshold:
            return base_delay * 1.5
        else:
            return base_delay
    
    def execute_with_retry(self, func, *args, **kwargs):
        """Execute function with adaptive retry strategy"""
        max_retries = 3
        
        for attempt in range(max_retries + 1):
            start_time = time.time()
            
            try:
                result = func(*args, **kwargs)
                response_time = time.time() - start_time
                self.add_result(success=True, response_time=response_time)
                return result
                
            except Exception as e:
                response_time = time.time() - start_time
                error_type = type(e).__name__
                self.add_result(success=False, response_time=response_time, error_type=error_type)
                
                if not self.should_retry(attempt, max_retries):
                    raise e
                
                delay = self.get_retry_delay(attempt)
                print(f"Attempt {attempt + 1} failed, retrying in {delay:.1f}s (success rate: {self.get_success_rate():.2%})")
                time.sleep(delay)
        
        raise Exception("Max retries exceeded")

Error Recovery Strategies

Graceful Degradation

class GracefulAIClient:
    def __init__(self, primary_api_key, fallback_api_key=None):
        self.primary_api_key = primary_api_key
        self.fallback_api_key = fallback_api_key
        self.retry_handler = SmartRetryHandler()
        self.error_handler = APIErrorHandler()
        
    def chat_with_fallback(self, messages, model="gemini-2.5-flash"):
        """Chat with automatic fallback strategies"""
        
        # Strategy 1: Try primary API with retry
        try:
            return self._make_request(self.primary_api_key, messages, model)
        except Exception as e:
            print(f"Primary API failed: {e}")
            
            # Strategy 2: Try fallback API key if available
            if self.fallback_api_key:
                try:
                    print("Trying fallback API key...")
                    return self._make_request(self.fallback_api_key, messages, model)
                except Exception as e2:
                    print(f"Fallback API also failed: {e2}")
            
            # Strategy 3: Try simpler model
            if model != "gemini-2.5-flash":
                try:
                    print("Trying simpler model...")
                    return self._make_request(self.primary_api_key, messages, "gemini-2.5-flash")
                except Exception as e3:
                    print(f"Simpler model also failed: {e3}")
            
            # Strategy 4: Provide cached or default response
            return self._get_fallback_response(messages)
    
    def _make_request(self, api_key, messages, model):
        """Make API request with error handling"""
        def api_call():
            response = requests.post(
                'https://gateway.iotex.ai/v1/chat/completions',
                headers={'Authorization': f'Bearer {api_key}'},
                json={'model': model, 'messages': messages},
                timeout=30
            )
            response.raise_for_status()
            return response.json()
        
        return self.retry_handler.retry(api_call)
    
    def _get_fallback_response(self, messages):
        """Provide fallback response when all else fails"""
        return {
            "choices": [{
                "message": {
                    "role": "assistant",
                    "content": "I apologize, but I'm experiencing technical difficulties and cannot process your request at the moment. Please try again later."
                }
            }],
            "usage": {"total_tokens": 0}
        }

Monitoring and Logging

import logging
import json
from datetime import datetime

class APIMonitor:
    def __init__(self, log_file="api_errors.log"):
        self.logger = logging.getLogger("api_monitor")
        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
        
    def log_request(self, request_data, response_data=None, error=None, response_time=None):
        """Log API request details"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "request": request_data,
            "response_time": response_time,
            "success": error is None
        }
        
        if response_data:
            log_entry["response"] = response_data
            
        if error:
            log_entry["error"] = {
                "type": type(error).__name__,
                "message": str(error)
            }
            self.logger.error(json.dumps(log_entry))
        else:
            self.logger.info(json.dumps(log_entry))
    
    def log_retry_attempt(self, attempt, max_attempts, delay, error):
        """Log retry attempts"""
        log_entry = {
            "event": "retry_attempt",
            "attempt": attempt,
            "max_attempts": max_attempts,
            "delay": delay,
            "error": str(error)
        }
        self.logger.warning(json.dumps(log_entry))

Best Practices for Error Handling

  1. Classify Errors Properly: Different error types require different handling strategies
  2. Implement Exponential Backoff: Avoid overwhelming the server during outages
  3. Use Circuit Breakers: Fail fast when the service is consistently unavailable
  4. Plan for Graceful Degradation: Provide fallback options when possible
  5. Monitor and Alert: Track error rates and set up appropriate alerts
  6. Log Comprehensively: Capture enough detail for debugging without logging sensitive data
  7. Test Error Scenarios: Regularly test your error handling in staging environments
  8. Set Reasonable Timeouts: Balance between allowing enough time and failing fast
  9. Respect Rate Limits: Implement proper rate limiting to avoid 429 errors
  10. Document Error Responses: Keep clear documentation of how different errors are handled