This metrics tool terrifies bad developers

Start free trial
SitePoint Premium
Stay Relevant and Grow Your Career in Tech
  • Premium Results
  • Publish articles on SitePoint
  • Daily curated jobs
  • Learning Paths
  • Discounts to dev tools
Start Free Trial

7 Day Free Trial. Cancel Anytime.

Production systems that depend on the Claude API face a fundamental reliability challenge. A Claude API circuit breaker pattern prevents a single provider outage from cascading through dependent services, exhausting thread pools, and degrading the entire application stack. This article provides a complete, production-ready circuit breaker implementation with multi-provider failover across Claude, OpenAI, and Ollama, backed by Redis for distributed state management.

Table of Contents

Prerequisites

  • Python 3.10+ (the examples use built-in generic types per PEP 585 and redis.asyncio)
  • Redis 6.0+ — e.g., docker run -p 6379:6379 redis:7
  • Install dependencies:
pip install anthropic openai ollama redis fakeredis pytest pytest-asyncio
  • Environment variables for API keys:
export ANTHROPIC_API_KEY="your-key-here"
export OPENAI_API_KEY="your-key-here"

Never hardcode API credentials in source files.

  • Ollama setup (for local fallback):
ollama serve        # Start the Ollama service
ollama pull llama3:8b  # Pull the specific model variant
  • Redis client instantiation (used throughout the article):
import redis.asyncio as redis

redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=False)
# Production deployments should use redis.asyncio.ConnectionPool with SSL and AUTH.
  • pytest configuration — create a pytest.ini in your project root:
[pytest]
asyncio_mode = auto

Why Claude API Integrations Need Circuit Breakers

The Cascade Failure Problem in AI Infrastructure

When the Claude API begins returning errors or timing out, errors rarely stay contained within the failing service. Consider a production system where multiple services depend on Claude for text generation. The API starts returning 529 overload errors. Each request now hangs for the full timeout duration instead of returning within its normal window. Thread pools fill. Connection pools exhaust. Upstream services that depend on those threads begin queuing. On a typical deployment of 8-12 workers across a few containers, a single API degradation propagates across the entire service mesh within a few minutes.

Naive retry logic accelerates the problem rather than solving it. When every failed request triggers two or three retries, the system amplifies load against an already struggling provider. This creates retry storms: the very traffic pattern that makes overloaded APIs slower to recover. The Anthropic API surfaces several distinct failure modes that demand different handling. Status codes 529 and 500 both point to the provider side, but 529 signals capacity pressure while 500 signals an internal fault. A 429 means the caller has exceeded its rate allocation. Network timeouts may indicate infrastructure issues entirely outside the API provider's control. Any of these can trigger cascade failures if left unmanaged.

Naive retry logic accelerates the problem rather than solving it. When every failed request triggers two or three retries, the system amplifies load against an already struggling provider.

Circuit Breaker Pattern Fundamentals

The circuit breaker pattern, formalized by Michael Nygard in Release It!, provides a mechanism to detect sustained failures and short-circuit requests before they reach a degraded service. The pattern defines three states.

In the normal case, the circuit is Closed: requests pass through to the downstream service, and failures accumulate against a threshold. When failures breach that threshold, the circuit transitions to Open, immediately routing all requests to a fallback without contacting the failing service. This eliminates wasted latency, prevents retry storms, and gives the provider time to recover. After a configured recovery timeout expires, the circuit enters Half-Open and allows a limited number of probe requests through to test whether the service has recovered. Success resets the breaker to Closed; failure reopens the circuit.

This pattern is uniquely suited to LLM APIs. These APIs exhibit high per-request latency (often seconds, not milliseconds), variable load characteristics driven by token counts, and meaningful cost per request. Failing fast via an open circuit avoids both the latency penalty and the token cost of requests that will never succeed.

from enum import Enum
from dataclasses import dataclass


class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0
    half_open_max_calls: int = 2

Designing the Circuit Breaker for LLM APIs

Architecture Overview

The system architecture places a circuit breaker between the client application and each LLM provider. The client sends requests to a resilient LLM client, which maintains individual circuit breakers for Claude API, OpenAI API, and a local Ollama instance. Redis stores shared circuit state, enabling multiple application workers and containers (e.g., 8 Gunicorn workers across 3 Kubernetes pods) to share a consistent view of each provider's health. State transitions flow from Closed to Open when failures breach the threshold, from Open to Half-Open after the recovery timeout TTL expires, and from Half-Open back to Closed on probe success or back to Open on probe failure.

┌────────┐     ┌─────────────────────┐     ┌─────────────────┐
│ Client ├────►│ ResilientLLMClient  ├──┬─►│ Claude API      │
└────────┘     │                     │  │  └─────────────────┘
               │  ┌───────────────┐  │  │  ┌─────────────────┐
               │  │CircuitBreaker ├──┼──┼─►│ OpenAI API      │
               │  │  (per provider)│  │  │  └─────────────────┘
               │  └───────┬───────┘  │  │  ┌─────────────────┐
               │          │          │  └─►│ Ollama (local)  │
               └──────────┼──────────┘     └─────────────────┘
                    ┌─────▼─────┐
                    │   Redis   │
                    │ (shared   │
                    │  state)   │
                    └───────────┘

State transitions:
  Closed ──[failures ≥ threshold]──► Open
  Open ──[TTL expires]──► Half-Open
  Half-Open ──[probe succeeds]──► Closed
  Half-Open ──[probe fails]──► Open

Choosing a State Backend: Local vs. Redis

For single-process applications, in-memory state tracking suffices. A dictionary holding failure counts and timestamps works. But production deployments typically run multiple workers or containers, and in-memory state creates a fragmented view: one worker might have its circuit open while another keeps hammering the failing API.

Redis-backed state solves this. All workers read and write to a shared set of keys. The key schema follows a consistent pattern: circuit:{provider_name}:state holds the current circuit state, circuit:{provider_name}:failure_count tracks consecutive failures, and circuit:{provider_name}:last_failure_time records when the most recent failure occurred. This schema allows each provider to maintain an independent circuit breaker while sharing state across the entire fleet.

Defining Failure Conditions for AI APIs

Not every error should trip the circuit breaker. HTTP status codes 429, 500, 502, 503, and 529 indicate service-side problems and should increment the failure counter. Timeout exceptions and connection errors also count. The Anthropic SDK raises anthropic.APIStatusError for HTTP-level failures, which the breaker should catch and classify.

The breaker should not count 400 Bad Request and 401 Unauthorized errors as failures. These indicate caller-side bugs: malformed prompts or invalid API keys. Tripping the circuit breaker on client errors would cause the system to fail over unnecessarily, routing requests to a secondary provider that will likely encounter the same caller-side problem.

Token-budget overruns and anomalous latency spikes warrant tracking on monitoring dashboards as early stress indicators, but the implementation should not trip the breaker on these alone. Instead, feed them into a separate alerting pipeline that flags degraded provider health before the breaker's failure threshold kicks in.

import asyncio
import time
import hashlib
import math
import redis.asyncio as redis
import logging

logger = logging.getLogger(__name__)


class CircuitOpenError(Exception):
    pass


class AllProvidersFailedError(Exception):
    pass


class CircuitBreaker:
    TRANSIENT_STATUS_CODES = {429, 500, 502, 503, 529}

    RECORD_FAILURE_SCRIPT = """
local count_key = KEYS[1]
local state_key = KEYS[2]
local time_key  = KEYS[3]
local half_open_key = KEYS[4]
local threshold        = tonumber(ARGV[1])
local recovery_timeout = math.floor(tonumber(ARGV[2]))
local now = ARGV[3]
local count_ttl = tonumber(ARGV[4])

local count = redis.call('INCR', count_key)
redis.call('EXPIRE', count_key, count_ttl)
redis.call('SET', time_key, now)
if count >= threshold then
    redis.call('SET',    state_key, 'open')
    redis.call('EXPIRE', state_key, recovery_timeout)
    redis.call('DEL', count_key)
    redis.call('DEL', half_open_key)
    return 1
end
return 0
"""

    def __init__(
        self,
        provider_name: str,
        redis_client: redis.Redis,
        config: CircuitBreakerConfig = None,
    ):
        self.provider_name = provider_name
        self.redis = redis_client
        self.config = config or CircuitBreakerConfig()
        self._key_prefix = f"circuit:{provider_name}"
        self._failure_script = self.redis.register_script(
            self.RECORD_FAILURE_SCRIPT
        )

    async def _get_state(self) -> CircuitState:
        state = await self.redis.get(f"{self._key_prefix}:state")
        if state is None:
            # Key absent: either never opened, or TTL expired from OPEN.
            # Check for an active half-open probe window.
            half_open_key = f"{self._key_prefix}:half_open_calls"
            existing = await self.redis.get(half_open_key)
            if existing is not None:
                return CircuitState.HALF_OPEN
            # If a last_failure_time exists, the circuit was previously open
            # and TTL has expired — transition to Half-Open for a probe.
            last_failure = await self.redis.get(
                f"{self._key_prefix}:last_failure_time"
            )
            if last_failure is not None:
                await self._set_state(
                    CircuitState.HALF_OPEN,
                    ttl=self.config.recovery_timeout,
                )
                return CircuitState.HALF_OPEN
            return CircuitState.CLOSED
        return CircuitState(
            state.decode() if isinstance(state, bytes) else state
        )

    async def _set_state(self, state: CircuitState, ttl: float = None):
        key = f"{self._key_prefix}:state"
        if ttl:
            await self.redis.set(key, state.value, ex=int(ttl))
        else:
            await self.redis.set(key, state.value)

    async def _record_failure(self):
        count_key = f"{self._key_prefix}:failure_count"
        state_key = f"{self._key_prefix}:state"
        time_key = f"{self._key_prefix}:last_failure_time"
        half_open_key = f"{self._key_prefix}:half_open_calls"
        count_ttl = int(self.config.recovery_timeout * 3)

        transitioned = await self._failure_script(
            keys=[count_key, state_key, time_key, half_open_key],
            args=[
                self.config.failure_threshold,
                int(self.config.recovery_timeout),
                str(time.time()),
                count_ttl,
            ],
        )
        if transitioned:
            logger.warning(
                "Circuit OPEN for %s", self.provider_name
            )

    async def _record_success(self):
        await self._set_state(CircuitState.CLOSED)
        await self.redis.delete(f"{self._key_prefix}:failure_count")
        await self.redis.delete(f"{self._key_prefix}:half_open_calls")
        await self.redis.delete(f"{self._key_prefix}:last_failure_time")
        logger.info("Circuit CLOSED for %s", self.provider_name)

    def _is_transient_failure(self, exc: Exception) -> bool:
        status = getattr(exc, "status_code", None)
        if status and status in self.TRANSIENT_STATUS_CODES:
            return True
        if isinstance(exc, (asyncio.TimeoutError, ConnectionError, OSError)):
            return True
        return False

    async def call(self, func, *args, **kwargs):
        timeout: float = kwargs.pop("circuit_timeout", 30.0)
        state = await self._get_state()

        if state == CircuitState.OPEN:
            raise CircuitOpenError(f"{self.provider_name} circuit is OPEN")

        if state == CircuitState.HALF_OPEN:
            calls_key = f"{self._key_prefix}:half_open_calls"
            # Use SETNX to allow exactly one probe at a time.
            acquired = await self.redis.set(
                calls_key, "1",
                ex=int(self.config.recovery_timeout * 2),
                nx=True,
            )
            if not acquired:
                raise CircuitOpenError(
                    f"{self.provider_name} half-open probe already in progress"
                )

        try:
            result = await asyncio.wait_for(
                func(*args, **kwargs), timeout=timeout
            )
            if state == CircuitState.HALF_OPEN:
                await self._record_success()
            return result
        except asyncio.TimeoutError as exc:
            if state == CircuitState.HALF_OPEN:
                await self.redis.delete(
                    f"{self._key_prefix}:half_open_calls"
                )
            if self._is_transient_failure(exc):
                await self._record_failure()
            raise
        except Exception as exc:
            if state == CircuitState.HALF_OPEN:
                # Release probe lock so the next call can retry.
                await self.redis.delete(
                    f"{self._key_prefix}:half_open_calls"
                )
            if self._is_transient_failure(exc):
                await self._record_failure()
            raise

Implementing Multi-Provider Failover

Defining the Provider Abstraction Layer

Each LLM provider exposes a different SDK with different response shapes. A common interface normalizes this so the failover chain can treat providers interchangeably. The abstraction defines a single async method that accepts a prompt and returns a string.

Security note: Pass api_key from the environment -- e.g., api_key=os.environ['ANTHROPIC_API_KEY']. Never hardcode credentials in source files.

import os
from abc import ABC, abstractmethod
import anthropic
import openai
import ollama as ollama_sdk


class LLMProvider(ABC):
    name: str

    @abstractmethod
    async def complete(self, prompt: str, **kwargs) -> str:
        ...


class ClaudeProvider(LLMProvider):
    name = "claude"

    def __init__(
        self,
        api_key: str,
        model: str = "claude-3-5-sonnet-20241022",  # Verify current model at docs.anthropic.com/en/docs/about-claude/models
    ):
        self.client = anthropic.AsyncAnthropic(api_key=api_key)
        self.model = model

    async def complete(self, prompt: str, **kwargs) -> str:
        response = await self.client.messages.create(
            model=self.model,
            max_tokens=kwargs.get("max_tokens", 1024),
            messages=[{"role": "user", "content": prompt}],
        )
        if not response.content:
            raise ValueError(
                f"Claude returned empty content block "
                f"(stop_reason={response.stop_reason!r})"
            )
        return response.content[0].text


class OpenAIProvider(LLMProvider):
    name = "openai"

    def __init__(self, api_key: str, model: str = "gpt-4o"):
        self.client = openai.AsyncOpenAI(api_key=api_key)
        self.model = model

    async def complete(self, prompt: str, **kwargs) -> str:
        response = await self.client.chat.completions.create(
            model=self.model,
            max_tokens=kwargs.get("max_tokens", 1024),
            messages=[{"role": "user", "content": prompt}],
        )
        if not response.choices:
            raise ValueError("OpenAI returned no choices")
        return response.choices[0].message.content


class OllamaProvider(LLMProvider):
    name = "ollama"

    def __init__(self, model: str = "llama3:8b"):
        self.model = model
        self.client = ollama_sdk.AsyncClient()

    async def complete(self, prompt: str, **kwargs) -> str:
        response = await self.client.chat(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
        )
        return response.message.content

The Failover Chain: Priority-Based Routing

The providers are ordered by preference. Claude is the primary: best output quality, production default. OpenAI acts as the secondary: comparable cloud-hosted quality, independent infrastructure. Ollama runs locally as the tertiary: degraded output quality, but available when ollama serve is active and the required model has been pulled with ollama pull llama3:8b.

Each provider gets its own circuit breaker instance. The resilient client iterates through the ordered list, checking circuit state before attempting each call. If a provider's circuit is open, the client skips it immediately and moves to the next. If a call fails with a transient error, the circuit breaker records the failure and the client proceeds down the chain.

from dataclasses import dataclass
from typing import Optional
import hashlib
import redis.asyncio as redis

logger = logging.getLogger(__name__)


@dataclass
class FallbackResponse:
    content: str
    provider: str
    degraded: bool
    cache_hit: bool


class ResilientLLMClient:
    def __init__(
        self,
        providers: list[LLMProvider],
        redis_client: redis.Redis,
        cache_ttl: int = 3600,
    ):
        self.providers = providers
        self.redis = redis_client
        self.cache_ttl = cache_ttl
        self.breakers = {
            p.name: CircuitBreaker(p.name, redis_client)
            for p in providers
        }

    async def complete(self, prompt: str, **kwargs) -> FallbackResponse:
        last_exception: Optional[Exception] = None

        for provider in self.providers:
            breaker = self.breakers[provider.name]
            try:
                logger.info("Attempting provider: %s", provider.name)
                result = await breaker.call(
                    provider.complete, prompt, **kwargs
                )
                degraded = provider.name != self.providers[0].name
                await self._cache_response(prompt, result)
                return FallbackResponse(
                    content=result,
                    provider=provider.name,
                    degraded=degraded,
                    cache_hit=False,
                )
            except CircuitOpenError:
                logger.warning(
                    "Skipping %s: circuit open", provider.name
                )
                continue
            except Exception as exc:
                logger.error(
                    "Provider %s failed: %s", provider.name, exc
                )
                last_exception = exc
                continue

        # All providers exhausted — try cache
        cached = await self._get_cached_response(prompt)
        if cached:
            logger.warning("All providers down, returning cached response")
            return FallbackResponse(
                content=cached,
                provider="cache",
                degraded=True,
                cache_hit=True,
            )

        if last_exception is not None:
            logger.error(
                "All providers failed. Last error detail: %s", last_exception
            )
        raise AllProvidersFailedError(
            "All configured LLM providers are unavailable."
        ) from last_exception

    async def _cache_response(self, prompt: str, response: str):
        cache_key = self._cache_key(prompt)
        await self.redis.set(
            cache_key, response.encode("utf-8"), ex=self.cache_ttl
        )

    async def _get_cached_response(self, prompt: str) -> Optional[str]:
        cache_key = self._cache_key(prompt)
        result = await self.redis.get(cache_key)
        return result.decode("utf-8") if result else None

    @staticmethod
    def _cache_key(prompt: str) -> str:
        return f"llm_cache:{hashlib.sha256(prompt.encode('utf-8')).hexdigest()}"

Graceful Degradation Strategies

The FallbackResponse dataclass carries metadata that downstream consumers need to handle degraded responses appropriately. The provider field identifies which model generated the response, and the degraded boolean flags any response that did not come from the primary provider. A cache_hit value of True indicates stale cached content.

Downstream services gate behavior on these fields: displaying a notice to users that the response came from a fallback model, suppressing high-stakes operations when running on a degraded provider, or logging quality metrics segmented by provider.

The cached response fallback uses a stale-on-failure strategy: cached content returns only when all live providers are unavailable. No background revalidation runs. Previously successful responses sit in Redis with a configurable TTL. When all providers are in Open state, the system returns the cached response rather than failing entirely. This is a last-ditch fallback: the content may be stale or contextually wrong, but it prevents a hard failure.

Privacy note: If prompts or responses contain PII, encrypt cache values before storage or disable caching for those request types.

When even the cache is empty, the system raises AllProvidersFailedError. In our implementation, the calling service queues the request for later processing. Alternatives include returning an error to the user or triggering an on-call alert.

Distributed State Management with Redis

Redis Key Design and Atomic Operations

The circuit breaker's correctness depends on atomic state transitions. A race condition where two workers simultaneously read the failure count, both see it at 4, and both increment it to 5 could produce inconsistent state. The Lua script embedded in the CircuitBreaker class and registered via register_script in __init__ handles the critical path: incrementing the failure count, applying a TTL to prevent stale accumulation, and transitioning to Open if the threshold is breached, all in a single atomic operation.

The TTL on the state key pulls double duty. When Redis expires the Open state key, a subsequent _get_state() call detects the absence and transitions to Half-Open, allowing a controlled probe before returning to Closed. This keeps the Open, Half-Open, Closed state machine correctly sequenced.

A SETNX-based lock on the half_open_calls key bounds half-open probe concurrency. Exactly one probe executes at a time. If the probe fails, the lock is explicitly deleted so the next recovery window can attempt another probe.

The circuit breaker's correctness depends on atomic state transitions. A race condition where two workers simultaneously read the failure count, both see it at 4, and both increment it to 5 could produce inconsistent state.

Monitoring and Observability

Every state transition should emit a structured log event or increment a Prometheus counter (e.g., circuit_breaker_state_transitions_total{provider="claude",to_state="open"}). How often does each provider's circuit open? Track that with a per-provider open frequency counter. Mean time to recovery tells you how long circuits stay open before successfully closing. Failover rate per provider shows what percentage of requests secondary or tertiary providers handle. Request latency segmented by circuit state quantifies the latency savings of fast-failing open circuits versus waiting for timeouts.

Alert when the primary provider's circuit stays open beyond a few minutes. A prolonged Claude outage likely requires human attention: checking Anthropic's status page, adjusting rate limits, or escalating.

Testing the Circuit Breaker

Unit Testing State Transitions

The circuit breaker's state machine must be tested through its full lifecycle. Using fakeredis provides an in-process Redis-compatible backend that avoids the need for a running Redis instance during tests. Mocked providers simulate controlled failure and recovery sequences.

import pytest
import pytest_asyncio
import fakeredis.aioredis
from unittest.mock import AsyncMock


@pytest_asyncio.fixture
async def redis_client():
    client = fakeredis.aioredis.FakeRedis()
    yield client
    await client.aclose()


@pytest_asyncio.fixture
async def breaker(redis_client):
    return CircuitBreaker(
        "test_provider",
        redis_client,
        CircuitBreakerConfig(
            failure_threshold=3,
            recovery_timeout=5,
            half_open_max_calls=1,
        ),
    )


@pytest.mark.asyncio
async def test_closed_to_open(breaker):
    failing_func = AsyncMock(
        side_effect=ConnectionError("timeout")
    )
    for _ in range(3):
        with pytest.raises(ConnectionError):
            await breaker.call(failing_func)
    state = await breaker._get_state()
    assert state == CircuitState.OPEN


@pytest.mark.asyncio
async def test_open_transitions_to_half_open_after_ttl(breaker):
    await breaker._set_state(CircuitState.OPEN, ttl=1)
    # Record a last_failure_time so _get_state knows the circuit was previously open.
    await breaker.redis.set(
        f"{breaker._key_prefix}:last_failure_time", str(time.time())
    )
    await asyncio.sleep(1.1)  # Wait for TTL expiry
    state = await breaker._get_state()
    assert state == CircuitState.HALF_OPEN


@pytest.mark.asyncio
async def test_half_open_success_closes(breaker):
    await breaker._set_state(CircuitState.HALF_OPEN)
    success_func = AsyncMock(return_value="response")
    result = await breaker.call(success_func)
    assert result == "response"
    state = await breaker._get_state()
    assert state == CircuitState.CLOSED

Chaos Testing in Staging

Unit tests verify the state machine logic, but chaos testing validates the full failover chain under realistic conditions. Tools like toxiproxy or mitmproxy can sit between the application and the Claude API endpoint, injecting 503 responses or artificial latency on demand.

A meaningful chaos test runs the following scenario: inject sustained 503 errors on the Claude endpoint, verify the circuit opens within the expected number of failures, confirm traffic fails over to OpenAI, then to Ollama if OpenAI is also poisoned, and verify that when the injection stops, the circuit transitions through Half-Open back to Closed and traffic returns to Claude.

Load-test during Open state to confirm the system leaks no resources. Open circuits should add near-zero latency (just the Redis lookup), and thread pools should remain healthy because no outbound HTTP connections are attempted.

Production Configuration and Tuning

Recommended Thresholds for LLM APIs

For Claude and OpenAI, a failure_threshold of 5 balances sensitivity against false positives. These cloud APIs can experience brief transient errors that resolve within seconds; a threshold of 3 would cause unnecessary flapping. We haven't measured a universal optimal value here: start at 5 and adjust based on your observed false-positive rate in staging. If your latency budget is tight, consider dropping to 4, but expect more frequent failovers during transient blips. For Ollama running locally, a higher threshold such as 10 reduces false positives but increases latency during genuine outages. Tune based on measured local response times.

Start recovery_timeout at 30 seconds for Claude and OpenAI, then measure your p50 recovery time and set the timeout to roughly 1.5x that value. For Ollama, 10 seconds suffices since local recovery is usually faster. Setting half_open_max_calls to 2 allows a meaningful probe without flooding a recovering service.

These values reflect a trade-off: fast failover to protect the application versus avoiding unnecessary degradation to secondary providers. Systems where provider quality differences are significant may want a longer recovery timeout to give the primary provider more time before probing.

These values reflect a trade-off: fast failover to protect the application versus avoiding unnecessary degradation to secondary providers.

Anti-Patterns to Avoid

Setting failure thresholds too low causes circuit flapping. A threshold of 1 or 2 means a single transient error sends traffic to a fallback provider, and the recovery probe succeeds immediately, creating an oscillation pattern that generates noise in monitoring and degrades user experience unpredictably.

Don't share a single circuit breaker across unrelated API operations. If the Claude messages endpoint is down but the embeddings endpoint is healthy, a shared breaker blocks embedding requests unnecessarily. Each logical operation should have its own breaker instance.

Ignoring idempotency creates a subtle but dangerous risk. If a request that was partially processed by the primary provider gets retried on a fallback provider, side effects may execute twice. The circuit breaker does not solve this; the calling code must ensure requests are safe to retry.

A misconfigured API key will quickly open the circuit if you count 400 and 401 responses as failures, sending all traffic to fallback providers while the real fix is simply correcting the key. The _is_transient_failure method in the implementation above explicitly gates on status codes known to indicate provider-side problems.

Common Pitfalls

  • half_open_calls key persistence: If the circuit re-opens during a half-open probe, the counter key can persist and block future recovery windows. The implementation above cleans up this key in both _record_failure and _record_success, and sets a TTL on the key during half-open probing. Verify with redis-cli keys "circuit:*:half_open_calls" after test runs.
  • failure_count key without TTL: The failure count key is given a TTL of recovery_timeout * 3 via the Lua script, preventing stale counts from a previous outage from accumulating indefinitely and causing false-positive circuit opens after long idle periods.
  • Cross-worker cache misses from hash(): Python's built-in hash() is randomized per process by default (PEP 456). The implementation above uses hashlib.sha256 for deterministic cache keys that work across processes and restarts.

The complete implementation, including all code from this article consolidated into a single runnable module with a README covering setup instructions for Redis and pip dependencies (anthropic, openai, ollama, redis, pytest, pytest-asyncio, fakeredis), is available as a GitHub Gist for direct download and adaptation. (Gist URL to be added upon publication.)

SitePoint TeamSitePoint Team

Sharing our passion for building incredible internet things.

© 2000 – 2026 SitePoint Pty. Ltd.
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.