Skip to content

API Reference

Main Cache Interface

fast_cache.FastAPICache

FastAPICache()

FastAPI Cache Extension.

This class provides caching utilities for FastAPI applications, including decorator-based caching and dependency-injection-based backend access.

Initialize the FastAPICache instance.

Source code in fast_cache/integration.py
def __init__(self) -> None:
    """
    Initialize the FastAPICache instance.
    """
    self._backend: Optional[CacheBackend] = None
    self._app: Optional[FastAPI] = None
    self._default_expire: Optional[Union[int, timedelta]] = None

get_cache

get_cache()

Get the configured cache backend for dependency injection.

Returns:

Name Type Description
CacheBackend CacheBackend

The configured cache backend instance.

Raises:

Type Description
RuntimeError

If the cache is not initialized.

Source code in fast_cache/integration.py
def get_cache(self) -> CacheBackend:
    """
    Get the configured cache backend for dependency injection.

    Returns:
        CacheBackend: The configured cache backend instance.

    Raises:
        RuntimeError: If the cache is not initialized.
    """
    if self._backend is None:
        raise RuntimeError("Cache not initialized. Call init_app first.")
    return self._backend

cached

cached(expire=None, key_builder=None, namespace=None, stampede_protection=True, lock_timeout=30, lock_wait=5.0)

Decorator for caching function results.

Parameters:

Name Type Description Default
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as a timedelta.

None
key_builder Optional[Callable[..., str]]

Custom function to build the cache key.

None
namespace Optional[str]

Optional namespace for the cache key.

None
stampede_protection bool

Enable distributed lock to prevent thundering herd on cache miss. Requires a backend that supports locking (e.g., Redis). Silently skipped for backends without lock support. Defaults to True.

True
lock_timeout int

Lock auto-expiry in seconds (deadlock protection). Defaults to 30.

30
lock_wait float

Max seconds to wait for the lock before falling back to a direct function call. Defaults to 5.0.

5.0

Returns:

Name Type Description
Callable Callable[[Callable[..., Any]], Callable[..., Any]]

A decorator that caches the function result.

Source code in fast_cache/integration.py
def cached(
    self,
    expire: Optional[Union[int, timedelta]] = None,
    key_builder: Optional[Callable[..., str]] = None,
    namespace: Optional[str] = None,
    stampede_protection: bool = True,
    lock_timeout: int = 30,
    lock_wait: float = 5.0,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """
    Decorator for caching function results.

    Args:
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as a timedelta.
        key_builder (Optional[Callable[..., str]]): Custom function to build the cache key.
        namespace (Optional[str]): Optional namespace for the cache key.
        stampede_protection (bool): Enable distributed lock to prevent thundering herd
            on cache miss. Requires a backend that supports locking (e.g., Redis).
            Silently skipped for backends without lock support. Defaults to True.
        lock_timeout (int): Lock auto-expiry in seconds (deadlock protection). Defaults to 30.
        lock_wait (float): Max seconds to wait for the lock before falling back
            to a direct function call. Defaults to 5.0.

    Returns:
        Callable: A decorator that caches the function result.
    """

    def decorator(func: Callable) -> Callable[..., Any]:
        """
        The actual decorator that wraps the function.

        Args:
            func (Callable): The function to be cached.

        Returns:
            Callable: The wrapped function with caching.
        """
        is_async = inspect.iscoroutinefunction(func)

        def build_cache_key(*args, **kwargs) -> str:
            """
            Build the cache key for the function call.

            Args:
                *args: Positional arguments for the function.
                **kwargs: Keyword arguments for the function.

            Returns:
                str: The generated cache key.
            """
            if key_builder is not None:
                key = key_builder(*args, **kwargs)
            else:
                # Default key building logic
                key = f"{func.__module__}:{func.__name__}:{str(args)}:{str(kwargs)}"

            if namespace:
                key = f"{namespace}:{key}"

            return key

        def _backend_supports_locking() -> bool:
            return self._backend is not None and hasattr(
                self._backend, "acquire_lock"
            )

        @wraps(func)
        async def async_wrapper(*args, **kwargs) -> Any:
            """
            Async wrapper for caching.

            Args:
                *args: Positional arguments.
                **kwargs: Keyword arguments.

            Returns:
                Any: The cached or computed result.
            """
            if not self._backend:
                return await func(*args, **kwargs)

            # Skip cache if explicitly requested
            if kwargs.pop("skip_cache", False):
                return await func(*args, **kwargs)

            cache_key = build_cache_key(*args, **kwargs)
            effective_expire = expire or self._default_expire

            # Fast path: cache hit
            cached_value = await self._backend.aget(
                cache_key, default=_CACHE_MISS
            )
            if cached_value is not _CACHE_MISS:
                return cached_value

            # Stampede protection: distributed lock
            if stampede_protection and _backend_supports_locking():
                token = await self._backend.aacquire_lock(
                    cache_key, timeout=lock_timeout, wait=lock_wait
                )

                if token is not None:
                    # We are the lock holder — rebuild the value
                    try:
                        result = await func(*args, **kwargs)
                        await self._backend.aset(
                            cache_key, result, expire=effective_expire
                        )
                        return result
                    finally:
                        await self._backend.arelease_lock(cache_key, token)
                else:
                    # Another process is rebuilding — check if it finished
                    cached_value = await self._backend.aget(
                        cache_key, default=_CACHE_MISS
                    )
                    if cached_value is not _CACHE_MISS:
                        return cached_value

                    # Still no value — fallback: call function directly
                    return await func(*args, **kwargs)

            # No stampede protection — original behavior
            result = await func(*args, **kwargs)
            await self._backend.aset(
                cache_key, result, expire=effective_expire
            )
            return result

        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            """
            Sync wrapper for caching.

            Args:
                *args: Positional arguments.
                **kwargs: Keyword arguments.

            Returns:
                Any: The cached or computed result.
            """
            if not self._backend:
                return func(*args, **kwargs)

            # Skip cache if explicitly requested
            if kwargs.pop("skip_cache", False):
                return func(*args, **kwargs)

            cache_key = build_cache_key(*args, **kwargs)
            effective_expire = expire or self._default_expire

            # Fast path: cache hit
            cached_value = self._backend.get(
                cache_key, default=_CACHE_MISS
            )
            if cached_value is not _CACHE_MISS:
                return cached_value

            # Stampede protection: distributed lock
            if stampede_protection and _backend_supports_locking():
                token = self._backend.acquire_lock(
                    cache_key, timeout=lock_timeout, wait=lock_wait
                )

                if token is not None:
                    # We are the lock holder — rebuild the value
                    try:
                        result = func(*args, **kwargs)
                        self._backend.set(
                            cache_key, result, expire=effective_expire
                        )
                        return result
                    finally:
                        self._backend.release_lock(cache_key, token)
                else:
                    # Another process is rebuilding — check if it finished
                    cached_value = self._backend.get(
                        cache_key, default=_CACHE_MISS
                    )
                    if cached_value is not _CACHE_MISS:
                        return cached_value

                    # Still no value — fallback: call function directly
                    return func(*args, **kwargs)

            # No stampede protection — original behavior
            result = func(*args, **kwargs)
            self._backend.set(
                cache_key, result, expire=effective_expire
            )
            return result

        return async_wrapper if is_async else sync_wrapper

    return decorator

lifespan_handler async

lifespan_handler(app)

Lifespan context manager for FastAPI.

This can be used as the lifespan argument to FastAPI to manage cache lifecycle.

Parameters:

Name Type Description Default
app FastAPI

The FastAPI application instance.

required

Yields:

Type Description
AsyncIterator[None]

None

Source code in fast_cache/integration.py
@asynccontextmanager
async def lifespan_handler(self, app: FastAPI) -> AsyncIterator[None]:
    """
    Lifespan context manager for FastAPI.

    This can be used as the `lifespan` argument to FastAPI to manage
    cache lifecycle.

    Args:
        app (FastAPI): The FastAPI application instance.

    Yields:
        None
    """
    if not hasattr(app, "state"):
        app.state = {}
    app.state["cache"] = self

    try:
        yield
    finally:
        if self._backend:
            close = getattr(self._backend, "aclose", None)
            if close:
                await close()
            else:
                close = getattr(self._backend, "close", None)
                if close:
                    close()

        self._backend = None
        self._app = None

init_app

init_app(app, backend, default_expire=None)

Initialize the cache extension.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application instance.

required
backend CacheBackend

Cache backend instance.

required
default_expire Optional[Union[int, timedelta]]

Default expiration time for cached items.

None
Source code in fast_cache/integration.py
def init_app(
    self,
    app: FastAPI,
    backend: CacheBackend,
    default_expire: Optional[Union[int, timedelta]] = None,
) -> None:
    """
    Initialize the cache extension.

    Args:
        app (FastAPI): FastAPI application instance.
        backend (CacheBackend): Cache backend instance.
        default_expire (Optional[Union[int, timedelta]]): Default expiration time for cached items.
    """
    self._backend = backend
    self._app = app
    self._default_expire = default_expire

Backends

fast_cache.InMemoryBackend

InMemoryBackend(namespace='fastapi-cache', max_size=None, cleanup_interval=30)

Bases: CacheBackend


              flowchart TD
              fast_cache.InMemoryBackend[InMemoryBackend]
              fast_cache.backends.backend.CacheBackend[CacheBackend]

                              fast_cache.backends.backend.CacheBackend --> fast_cache.InMemoryBackend
                


              click fast_cache.InMemoryBackend href "" "fast_cache.InMemoryBackend"
              click fast_cache.backends.backend.CacheBackend href "" "fast_cache.backends.backend.CacheBackend"
            

Initializes a new instance of the InMemoryBackend cache.

This backend provides an in-memory cache with optional LRU (Least Recently Used) eviction, namespace support, thread and async safety, and automatic periodic cleanup of expired items. It is suitable for single-process, multi-threaded, or asyncio-based applications.

Parameters:

Name Type Description Default
namespace str

A namespace prefix for all cache keys. This allows multiple independent caches to share the same process. Defaults to "fastapi-cache".

'fastapi-cache'
max_size Optional[int]

The maximum number of items to store in the cache. If set, the cache will evict the least recently used items when the limit is exceeded. If None, the cache size is unlimited. Defaults to None.

None
cleanup_interval int

The interval, in seconds, at which the background cleanup job runs to remove expired cache entries. Defaults to 30.

30
Notes
  • The backend uses an OrderedDict to maintain LRU order.
  • Both synchronous (thread-safe) and asynchronous (asyncio-safe) operations are supported.
  • Expired items are removed automatically by a background scheduler.
  • This backend is not suitable for multi-process or distributed environments.

Initialize the in-memory cache backend.

Parameters:

Name Type Description Default
namespace str

Namespace prefix for all keys.

'fastapi-cache'
max_size Optional[int]

Optional maximum number of items (LRU eviction if set).

None
cleanup_interval int

Interval in seconds for background cleanup.

30
Source code in fast_cache/backends/memory.py
def __init__(
    self,
    namespace: str = "fastapi-cache",
    max_size: Optional[int] = None,
    cleanup_interval: int = 30,
) -> None:
    """
    Initialize the in-memory cache backend.

    Args:
        namespace: Namespace prefix for all keys.
        max_size: Optional maximum number of items (LRU eviction if set).
        cleanup_interval: Interval in seconds for background cleanup.
    """
    self._namespace = namespace
    self._cache: OrderedDict[str, Tuple[Any, Optional[float]]] = OrderedDict()
    self._lock = threading.Lock()
    self._async_lock = asyncio.Lock()
    self._max_size = max_size
    self._cleanup_interval = cleanup_interval

    self._scheduler = None
    self._scheduler_lock = threading.Lock()
    self._start_cleanup_scheduler()

get

get(key, default=None)

Synchronously retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns default. If the entry is expired, it is deleted from the cache (lazy deletion). Accessing an item moves it to the end of the LRU order.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached Python object, or default if not found or expired.

Notes
  • Thread-safe.
  • Expired entries are removed on access.
  • Updates LRU order on access.
Source code in fast_cache/backends/memory.py
def get(self, key: str, default: Any = None) -> Any:
    """
    Synchronously retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns default. If the
    entry is expired, it is deleted from the cache (lazy deletion). Accessing
    an item moves it to the end of the LRU order.

    Args:
        key (str): The cache key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached Python object, or default if not found or expired.

    Notes:
        - Thread-safe.
        - Expired entries are removed on access.
        - Updates LRU order on access.
    """
    k = self._make_key(key)
    with self._lock:
        item = self._cache.get(k)
        if item:
            value, expire_time = item
            if not self._is_expired(expire_time):
                self._cache.move_to_end(k)
                return value
            self._cache.pop(k, None)
        return default

set

set(key, value, expire=None)

Synchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion. Setting an item moves it to the end of the LRU order.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • Thread-safe.
  • Triggers LRU eviction if max_size is set.
  • Updates LRU order on set.
Source code in fast_cache/backends/memory.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion. Setting an item moves it to
    the end of the LRU order.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - Thread-safe.
        - Triggers LRU eviction if max_size is set.
        - Updates LRU order on set.
    """
    k = self._make_key(key)
    expire_time = self._get_expire_time(expire)
    with self._lock:
        self._cache[k] = (value, expire_time)
        self._cache.move_to_end(k)
        self._evict_if_needed()

delete

delete(key)

Synchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Thread-safe.
  • The key is automatically namespaced.
Source code in fast_cache/backends/memory.py
def delete(self, key: str) -> None:
    """
    Synchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Thread-safe.
        - The key is automatically namespaced.
    """
    k = self._make_key(key)
    with self._lock:
        self._cache.pop(k, None)

clear

clear()

Synchronously removes all cache entries in the current namespace.

This method deletes all entries whose keys match the current namespace prefix.

Notes
  • Thread-safe.
  • Only entries in the current namespace are affected.
  • This operation can be expensive if the cache is large.
Source code in fast_cache/backends/memory.py
def clear(self) -> None:
    """
    Synchronously removes all cache entries in the current namespace.

    This method deletes all entries whose keys match the current namespace prefix.

    Notes:
        - Thread-safe.
        - Only entries in the current namespace are affected.
        - This operation can be expensive if the cache is large.
    """
    prefix = f"{self._namespace}:"
    with self._lock:
        keys_to_delete = [k for k in self._cache if k.startswith(prefix)]
        for k in keys_to_delete:
            self._cache.pop(k, None)

has

has(key)

Synchronously checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Thread-safe.
  • Expired entries are not considered present and are removed on check.
  • Updates LRU order on access.
Source code in fast_cache/backends/memory.py
def has(self, key: str) -> bool:
    """
    Synchronously checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Thread-safe.
        - Expired entries are not considered present and are removed on check.
        - Updates LRU order on access.
    """
    k = self._make_key(key)
    with self._lock:
        item = self._cache.get(k)
        if item:
            _, expire_time = item
            if not self._is_expired(expire_time):
                self._cache.move_to_end(k)
                return True
            self._cache.pop(k, None)
        return False

aget async

aget(key, default=None)

Asynchronously retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns default. If the entry is expired, it is deleted from the cache (lazy deletion). Accessing an item moves it to the end of the LRU order.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached Python object, or default if not found or expired.

Notes
  • Asyncio-safe.
  • Expired entries are removed on access.
  • Updates LRU order on access.
Source code in fast_cache/backends/memory.py
async def aget(self, key: str, default: Any = None) -> Any:
    """
    Asynchronously retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns default. If the
    entry is expired, it is deleted from the cache (lazy deletion). Accessing
    an item moves it to the end of the LRU order.

    Args:
        key (str): The cache key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached Python object, or default if not found or expired.

    Notes:
        - Asyncio-safe.
        - Expired entries are removed on access.
        - Updates LRU order on access.
    """
    k = self._make_key(key)
    async with self._async_lock:
        item = self._cache.get(k)
        if item:
            value, expire_time = item
            if not self._is_expired(expire_time):
                self._cache.move_to_end(k)
                return value
            self._cache.pop(k, None)
        return default

aset async

aset(key, value, expire=None)

Asynchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion. Setting an item moves it to the end of the LRU order.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • Asyncio-safe.
  • Triggers LRU eviction if max_size is set.
  • Updates LRU order on set.
Source code in fast_cache/backends/memory.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion. Setting an item moves it to
    the end of the LRU order.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - Asyncio-safe.
        - Triggers LRU eviction if max_size is set.
        - Updates LRU order on set.
    """
    k = self._make_key(key)
    expire_time = self._get_expire_time(expire)
    async with self._async_lock:
        self._cache[k] = (value, expire_time)
        self._cache.move_to_end(k)
        self._evict_if_needed()

adelete async

adelete(key)

Asynchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Asyncio-safe.
  • The key is automatically namespaced.
Source code in fast_cache/backends/memory.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Asyncio-safe.
        - The key is automatically namespaced.
    """
    k = self._make_key(key)
    async with self._async_lock:
        self._cache.pop(k, None)

aclear async

aclear()

Asynchronously removes all cache entries in the current namespace.

This method deletes all entries whose keys match the current namespace prefix.

Notes
  • Asyncio-safe.
  • Only entries in the current namespace are affected.
  • This operation can be expensive if the cache is large.
Source code in fast_cache/backends/memory.py
async def aclear(self) -> None:
    """
    Asynchronously removes all cache entries in the current namespace.

    This method deletes all entries whose keys match the current namespace prefix.

    Notes:
        - Asyncio-safe.
        - Only entries in the current namespace are affected.
        - This operation can be expensive if the cache is large.
    """
    prefix = f"{self._namespace}:"
    async with self._async_lock:
        keys_to_delete = [k for k in self._cache if k.startswith(prefix)]
        for k in keys_to_delete:
            self._cache.pop(k, None)

ahas async

ahas(key)

Asynchronously checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Asyncio-safe.
  • Expired entries are not considered present and are removed on check.
  • Updates LRU order on access.
Source code in fast_cache/backends/memory.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Asyncio-safe.
        - Expired entries are not considered present and are removed on check.
        - Updates LRU order on access.
    """
    k = self._make_key(key)
    async with self._async_lock:
        item = self._cache.get(k)
        if item:
            _, expire_time = item
            if not self._is_expired(expire_time):
                self._cache.move_to_end(k)
                return True
            self._cache.pop(k, None)
        return False

close

close()

Closes the backend and stops the background cleanup scheduler.

This method should be called when the backend is no longer needed to ensure all resources are released and background jobs are stopped.

Notes
  • After calling this method, the cache is cleared and cannot be used.
  • The background cleanup scheduler is stopped.
Source code in fast_cache/backends/memory.py
def close(self) -> None:
    """
    Closes the backend and stops the background cleanup scheduler.

    This method should be called when the backend is no longer needed to ensure
    all resources are released and background jobs are stopped.

    Notes:
        - After calling this method, the cache is cleared and cannot be used.
        - The background cleanup scheduler is stopped.
    """
    self._stop_cleanup_scheduler()
    self._cache.clear()

fast_cache.RedisBackend

RedisBackend(redis_url, namespace='fastapi-cache', pool_size=10, max_connections=20)

Bases: CacheBackend


              flowchart TD
              fast_cache.RedisBackend[RedisBackend]
              fast_cache.backends.backend.CacheBackend[CacheBackend]

                              fast_cache.backends.backend.CacheBackend --> fast_cache.RedisBackend
                


              click fast_cache.RedisBackend href "" "fast_cache.RedisBackend"
              click fast_cache.backends.backend.CacheBackend href "" "fast_cache.backends.backend.CacheBackend"
            

Redis cache backend implementation with namespace support.

Attributes:

Name Type Description
_namespace str

Namespace prefix for all keys.

_sync_pool ConnectionPool

Synchronous Redis connection pool.

_async_pool ConnectionPool

Asynchronous Redis connection pool.

_sync_client Redis

Synchronous Redis client.

_async_client Redis

Asynchronous Redis client.

Initialize Redis backend with connection URL and pool settings.

Parameters:

Name Type Description Default
redis_url str

Redis connection URL (e.g., "redis://localhost:6379/0").

required
namespace str

Namespace prefix for all keys (default: "fastapi-cache").

'fastapi-cache'
pool_size int

Minimum number of connections in the pool.

10
max_connections int

Maximum number of connections in the pool.

20
Source code in fast_cache/backends/redis.py
def __init__(
    self,
    redis_url: str,
    namespace: str = "fastapi-cache",
    pool_size: int = 10,
    max_connections: int = 20,
) -> None:
    """
    Initialize Redis backend with connection URL and pool settings.

    Args:
        redis_url (str): Redis connection URL (e.g., "redis://localhost:6379/0").
        namespace (str): Namespace prefix for all keys (default: "fastapi-cache").
        pool_size (int): Minimum number of connections in the pool.
        max_connections (int): Maximum number of connections in the pool.
    """

    try:
        import redis.asyncio as aioredis
        import redis
    except ImportError:
        raise ImportError(
            "RedisBackend requires the 'redis' package. "
            "Install it with: pip install fast-cache[redis]"
        )

    self._namespace = namespace
    self._sync_pool = redis.ConnectionPool.from_url(
        redis_url, max_connections=max_connections, decode_responses=False
    )

    self._async_pool = aioredis.ConnectionPool.from_url(
        redis_url,
        max_connections=max_connections,
        decode_responses=False,
        encoding="utf-8",
    )

    self._sync_client = redis.Redis(connection_pool=self._sync_pool)
    self._async_client = aioredis.Redis(connection_pool=self._async_pool)

aget async

aget(key, default=None)

Asynchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached value, or default if not found.

Source code in fast_cache/backends/redis.py
async def aget(self, key: str, default: Any = None) -> Any:
    """
    Asynchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached value, or default if not found.
    """
    try:
        result = await self._async_client.get(self._make_key(key))
        return pickle.loads(result) if result else default
    except Exception as e:
        logger.warning("Cache aget failed: %s", e)
        return default

get

get(key, default=None)

Synchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached value, or default if not found.

Source code in fast_cache/backends/redis.py
def get(self, key: str, default: Any = None) -> Any:
    """
    Synchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached value, or default if not found.
    """
    try:
        result = self._sync_client.get(self._make_key(key))
        return pickle.loads(result) if result else default
    except Exception as e:
        logger.warning("Cache get failed: %s", e)
        return default

aset async

aset(key, value, expire=None)

Asynchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/redis.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    try:
        ex = expire.total_seconds() if isinstance(expire, timedelta) else expire
        await self._async_client.set(
            self._make_key(key), pickle.dumps(value), ex=ex
        )
    except Exception as e:
        logger.warning("Cache aset failed: %s", e)

set

set(key, value, expire=None)

Synchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/redis.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    try:
        ex = expire.total_seconds() if isinstance(expire, timedelta) else expire
        self._sync_client.set(self._make_key(key), pickle.dumps(value), ex=ex)
    except Exception as e:
        logger.warning("Cache set failed: %s", e)

adelete async

adelete(key)

Asynchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/redis.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    try:
        await self._async_client.delete(self._make_key(key))
    except Exception as e:
        logger.warning("Cache adelete failed: %s", e)

delete

delete(key)

Synchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/redis.py
def delete(self, key: str) -> None:
    """
    Synchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    try:
        self._sync_client.delete(self._make_key(key))
    except Exception as e:
        logger.warning("Cache delete failed: %s", e)

aclear async

aclear()

Asynchronously clear all values from the namespace.

Source code in fast_cache/backends/redis.py
async def aclear(self) -> None:
    """
    Asynchronously clear all values from the namespace.
    """
    try:
        keys = await self._scan_keys()
        if keys:
            await self._async_client.delete(*keys)
    except Exception as e:
        logger.warning("Cache aclear failed: %s", e)

clear

clear()

Synchronously clear all values from the namespace.

Source code in fast_cache/backends/redis.py
def clear(self) -> None:
    """
    Synchronously clear all values from the namespace.
    """
    try:
        cursor = 0
        namespace_pattern = self._make_key("*")

        while True:
            cursor, keys = self._sync_client.scan(
                cursor=cursor, match=namespace_pattern, count=100
            )
            if keys:
                self._sync_client.delete(*keys)
            if cursor == 0:
                break
    except Exception as e:
        logger.warning("Cache clear failed: %s", e)

ahas async

ahas(key)

Asynchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/redis.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    try:
        return await self._async_client.exists(self._make_key(key)) > 0
    except Exception as e:
        logger.warning("Cache ahas failed: %s", e)
        return False

has

has(key)

Synchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/redis.py
def has(self, key: str) -> bool:
    """
    Synchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    try:
        return self._sync_client.exists(self._make_key(key)) > 0
    except Exception as e:
        logger.warning("Cache has failed: %s", e)
        return False

acquire_lock

acquire_lock(key, timeout=30, wait=5.0, poll_interval=0.05)

Acquire a distributed lock for stampede protection.

Parameters:

Name Type Description Default
key str

Cache key to lock (lock suffix added internally).

required
timeout int

Lock auto-expiry in seconds (deadlock protection).

30
wait float

Max seconds to poll before giving up.

5.0
poll_interval float

Sleep between poll attempts in seconds.

0.05

Returns:

Type Description
Optional[str]

A token string if acquired, None if the lock could not be

Optional[str]

obtained within the wait period or on Redis failure.

Source code in fast_cache/backends/redis.py
def acquire_lock(
    self,
    key: str,
    timeout: int = 30,
    wait: float = 5.0,
    poll_interval: float = 0.05,
) -> Optional[str]:
    """
    Acquire a distributed lock for stampede protection.

    Args:
        key: Cache key to lock (lock suffix added internally).
        timeout: Lock auto-expiry in seconds (deadlock protection).
        wait: Max seconds to poll before giving up.
        poll_interval: Sleep between poll attempts in seconds.

    Returns:
        A token string if acquired, None if the lock could not be
        obtained within the wait period or on Redis failure.
    """
    lock_key = self._make_key(f"{key}:_lock")
    token = uuid.uuid4().hex
    deadline = time.monotonic() + wait

    try:
        while True:
            if self._sync_client.set(lock_key, token, nx=True, ex=timeout):
                return token
            if time.monotonic() >= deadline:
                return None
            time.sleep(poll_interval)
    except Exception as e:
        logger.warning("Lock acquire failed: %s", e)
        return None

release_lock

release_lock(key, token)

Release a distributed lock only if the caller still owns it.

Uses a Lua script to atomically check the token and delete, preventing release of a lock re-acquired by another process.

Parameters:

Name Type Description Default
key str

Cache key that was locked.

required
token str

The token returned by acquire_lock.

required

Returns:

Type Description
bool

True if the lock was released, False otherwise.

Source code in fast_cache/backends/redis.py
def release_lock(self, key: str, token: str) -> bool:
    """
    Release a distributed lock only if the caller still owns it.

    Uses a Lua script to atomically check the token and delete,
    preventing release of a lock re-acquired by another process.

    Args:
        key: Cache key that was locked.
        token: The token returned by acquire_lock.

    Returns:
        True if the lock was released, False otherwise.
    """
    lock_key = self._make_key(f"{key}:_lock")
    try:
        return self._sync_client.eval(_UNLOCK_SCRIPT, 1, lock_key, token) == 1
    except Exception as e:
        logger.warning("Lock release failed: %s", e)
        return False

aacquire_lock async

aacquire_lock(key, timeout=30, wait=5.0, poll_interval=0.05)

Asynchronously acquire a distributed lock for stampede protection.

Parameters:

Name Type Description Default
key str

Cache key to lock (lock suffix added internally).

required
timeout int

Lock auto-expiry in seconds (deadlock protection).

30
wait float

Max seconds to poll before giving up.

5.0
poll_interval float

Sleep between poll attempts in seconds.

0.05

Returns:

Type Description
Optional[str]

A token string if acquired, None if the lock could not be

Optional[str]

obtained within the wait period or on Redis failure.

Source code in fast_cache/backends/redis.py
async def aacquire_lock(
    self,
    key: str,
    timeout: int = 30,
    wait: float = 5.0,
    poll_interval: float = 0.05,
) -> Optional[str]:
    """
    Asynchronously acquire a distributed lock for stampede protection.

    Args:
        key: Cache key to lock (lock suffix added internally).
        timeout: Lock auto-expiry in seconds (deadlock protection).
        wait: Max seconds to poll before giving up.
        poll_interval: Sleep between poll attempts in seconds.

    Returns:
        A token string if acquired, None if the lock could not be
        obtained within the wait period or on Redis failure.
    """
    lock_key = self._make_key(f"{key}:_lock")
    token = uuid.uuid4().hex
    deadline = time.monotonic() + wait

    try:
        while True:
            if await self._async_client.set(
                lock_key, token, nx=True, ex=timeout
            ):
                return token
            if time.monotonic() >= deadline:
                return None
            await asyncio.sleep(poll_interval)
    except Exception as e:
        logger.warning("Lock aacquire failed: %s", e)
        return None

arelease_lock async

arelease_lock(key, token)

Asynchronously release a distributed lock only if the caller still owns it.

Uses a Lua script to atomically check the token and delete, preventing release of a lock re-acquired by another process.

Parameters:

Name Type Description Default
key str

Cache key that was locked.

required
token str

The token returned by aacquire_lock.

required

Returns:

Type Description
bool

True if the lock was released, False otherwise.

Source code in fast_cache/backends/redis.py
async def arelease_lock(self, key: str, token: str) -> bool:
    """
    Asynchronously release a distributed lock only if the caller still owns it.

    Uses a Lua script to atomically check the token and delete,
    preventing release of a lock re-acquired by another process.

    Args:
        key: Cache key that was locked.
        token: The token returned by aacquire_lock.

    Returns:
        True if the lock was released, False otherwise.
    """
    lock_key = self._make_key(f"{key}:_lock")
    try:
        result = await self._async_client.eval(
            _UNLOCK_SCRIPT, 1, lock_key, token
        )
        return result == 1
    except Exception as e:
        logger.warning("Lock arelease failed: %s", e)
        return False

close async

close()

Close Redis connections and clean up pools.

Source code in fast_cache/backends/redis.py
async def close(self) -> None:
    """
    Close Redis connections and clean up pools.
    """
    await self._async_client.close()
    await self._async_pool.disconnect()
    self._sync_client.close()
    self._sync_pool.disconnect()

fast_cache.PostgresBackend

PostgresBackend(dsn, namespace='fastapi', min_size=1, max_size=10, cleanup_interval=30, auto_cleanup=True)

Bases: CacheBackend


              flowchart TD
              fast_cache.PostgresBackend[PostgresBackend]
              fast_cache.backends.backend.CacheBackend[CacheBackend]

                              fast_cache.backends.backend.CacheBackend --> fast_cache.PostgresBackend
                


              click fast_cache.PostgresBackend href "" "fast_cache.PostgresBackend"
              click fast_cache.backends.backend.CacheBackend href "" "fast_cache.backends.backend.CacheBackend"
            

PostgreSQL cache backend implementation.

Uses an UNLOGGED TABLE for performance and lazy expiration.

Initializes a new instance of the PostgresBackend cache.

This backend uses a PostgreSQL database to store cache entries in an UNLOGGED TABLE for improved performance. It supports both synchronous and asynchronous operations, lazy expiration, and periodic cleanup of expired entries.

Parameters:

Name Type Description Default
dsn str

The PostgreSQL DSN (Data Source Name) string used to connect to the database.

required
namespace str

A namespace prefix for all cache keys. This allows multiple independent caches to share the same database table. Only alphanumeric characters and underscores are allowed. Defaults to "fastapi".

'fastapi'
min_size int

The minimum number of connections to maintain in the connection pool. Defaults to 1.

1
max_size int

The maximum number of connections allowed in the connection pool. Defaults to 10.

10
cleanup_interval int

The interval, in seconds, at which the background cleanup job runs to remove expired cache entries. Defaults to 30 seconds.

30
auto_cleanup bool

If True, automatically starts the background cleanup scheduler on initialization. Defaults to True.

True

Raises:

Type Description
ImportError

If the required psycopg[pool] package is not installed.

ValueError

If the provided namespace contains invalid characters.

Notes
  • The backend creates the cache table and an index on the expiration column if they do not already exist.
  • The cleanup scheduler can be started or stopped manually.
  • Both synchronous and asynchronous connection pools are managed.
Source code in fast_cache/backends/postgres.py
def __init__(
    self,
    dsn: str,
    namespace: str = "fastapi",
    min_size: int = 1,
    max_size: int = 10,
    cleanup_interval: int = 30,
    auto_cleanup: bool = True,
) -> None:
    """
    Initializes a new instance of the PostgresBackend cache.

    This backend uses a PostgreSQL database to store cache entries in an
    UNLOGGED TABLE for improved performance. It supports both synchronous and
    asynchronous operations, lazy expiration, and periodic cleanup of expired
    entries.

    Args:
        dsn (str): The PostgreSQL DSN (Data Source Name) string used to connect
            to the database.
        namespace (str, optional): A namespace prefix for all cache keys. This
            allows multiple independent caches to share the same database table.
            Only alphanumeric characters and underscores are allowed. Defaults to "fastapi".
        min_size (int, optional): The minimum number of connections to maintain
            in the connection pool. Defaults to 1.
        max_size (int, optional): The maximum number of connections allowed in
            the connection pool. Defaults to 10.
        cleanup_interval (int, optional): The interval, in seconds, at which the
            background cleanup job runs to remove expired cache entries.
            Defaults to 30 seconds.
        auto_cleanup (bool, optional): If True, automatically starts the
            background cleanup scheduler on initialization. Defaults to True.

    Raises:
        ImportError: If the required `psycopg[pool]` package is not installed.
        ValueError: If the provided namespace contains invalid characters.

    Notes:
        - The backend creates the cache table and an index on the expiration
          column if they do not already exist.
        - The cleanup scheduler can be started or stopped manually.
        - Both synchronous and asynchronous connection pools are managed.
    """
    try:
        from psycopg_pool import AsyncConnectionPool, ConnectionPool
    except ImportError:
        raise ImportError(
            "PostgresBackend requires the 'psycopg[pool]' package. "
            "Install it with: pip install fast-cache[postgres]"
        )

    self._namespace = _validate_namespace(namespace)
    self._table_name = f"{namespace}_cache_store"

    # The pools are opened on creation and will auto-reopen if needed
    # when using the context manager (`with/async with`).
    self._sync_pool = ConnectionPool(
        conninfo=dsn, min_size=min_size, max_size=max_size, open=True
    )
    self._async_pool = AsyncConnectionPool(
        conninfo=dsn, min_size=min_size, max_size=max_size, open=False
    )
    self._create_unlogged_table_if_not_exists()

    self._cleanup_interval = cleanup_interval
    self._auto_cleanup = auto_cleanup

    self._scheduler = None
    self._scheduler_lock = threading.Lock()
    self._async_pool_lock = asyncio.Lock()

    if self._auto_cleanup:
        self._start_cleanup_scheduler()

set

set(key, value, expire=None)

Stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object/values to cache. It will be serialized using pickle.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • The key is automatically namespaced.
  • Expired entries are lazily deleted on access or by the cleanup job.
Source code in fast_cache/backends/postgres.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object/values to cache. It will be serialized using pickle.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - The key is automatically namespaced.
        - Expired entries are lazily deleted on access or by the cleanup job.
    """
    expire_at = self._compute_expire_at(expire)
    with self._sync_pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                f"""
                INSERT INTO {self._table_name} (key, value, expire_at)
                VALUES (%s, %s, %s)
                ON CONFLICT (key)
                DO UPDATE SET value = EXCLUDED.value,
                              expire_at = EXCLUDED.expire_at;
                """,
                (self._make_key(key), pickle.dumps(value), expire_at),
            )
            conn.commit()

get

get(key, default=None)

Retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns default. If the entry is expired, it is deleted from the cache (lazy deletion).

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached Python object, or default if not found or expired.

Notes
  • The value is deserialized using pickle.
  • Expired entries are removed on access.
Source code in fast_cache/backends/postgres.py
def get(self, key: str, default: Any = None) -> Any:
    """
    Retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns default. If the
    entry is expired, it is deleted from the cache (lazy deletion).

    Args:
        key (str): The cache key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached Python object, or default if not found or expired.

    Notes:
        - The value is deserialized using pickle.
        - Expired entries are removed on access.
    """
    with self._sync_pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                f"SELECT value, expire_at FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            row = cur.fetchone()
            if not row:
                return default
            value, expire_at = row
            if self._is_expired(expire_at):
                self.delete(key)  # Lazy delete
                return default
            return pickle.loads(value)

delete

delete(key)

Deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • The key is automatically namespaced.
Source code in fast_cache/backends/postgres.py
def delete(self, key: str) -> None:
    """
    Deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - The key is automatically namespaced.
    """
    with self._sync_pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                f"DELETE FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            conn.commit()

has

has(key)

Checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Expired entries are not considered present.
  • Does not remove expired entries; use get for lazy deletion.
Source code in fast_cache/backends/postgres.py
def has(self, key: str) -> bool:
    """
    Checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Expired entries are not considered present.
        - Does not remove expired entries; use `get` for lazy deletion.
    """
    with self._sync_pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                f"SELECT expire_at FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            row = cur.fetchone()
            if not row:
                return False
            return not self._is_expired(row[0])

clear

clear()

Removes all cache entries in the current namespace.

This method deletes all rows from the cache table whose keys match the current namespace prefix.

Notes
  • Only entries in the current namespace are affected.
  • This operation can be expensive if the cache is large.
Source code in fast_cache/backends/postgres.py
def clear(self) -> None:
    """
    Removes all cache entries in the current namespace.

    This method deletes all rows from the cache table whose keys match the
    current namespace prefix.

    Notes:
        - Only entries in the current namespace are affected.
        - This operation can be expensive if the cache is large.
    """

    with self._sync_pool.connection() as conn:
        with conn.cursor() as cur:
            # FIX: Use the dynamic table name
            cur.execute(
                f"DELETE FROM {self._table_name} WHERE key LIKE %s;",
                (self._make_key("%"),),
            )
            conn.commit()

aset async

aset(key, value, expire=None)

Asynchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object/values to cache. It will be serialized using pickle.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • Uses the asynchronous connection pool.
  • The key is automatically namespaced.
Source code in fast_cache/backends/postgres.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object/values to cache. It will be serialized using pickle.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - Uses the asynchronous connection pool.
        - The key is automatically namespaced.
    """
    await self._ensure_async_pool_open()
    expire_at = self._compute_expire_at(expire)
    async with self._async_pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                f"""
                INSERT INTO {self._table_name} (key, value, expire_at)
                VALUES (%s, %s, %s)
                ON CONFLICT (key)
                DO UPDATE SET value = EXCLUDED.value,
                              expire_at = EXCLUDED.expire_at;
                """,
                (self._make_key(key), pickle.dumps(value), expire_at),
            )
            await conn.commit()

aget async

aget(key, default=None)

Asynchronously retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns default. If the entry is expired, it is deleted from the cache (lazy deletion).

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached Python object, or default if not found or expired.

Notes
  • Uses the asynchronous connection pool.
  • The value is deserialized using pickle.
  • Expired entries are removed on access.
Source code in fast_cache/backends/postgres.py
async def aget(self, key: str, default: Any = None) -> Any:
    """
    Asynchronously retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns default. If the
    entry is expired, it is deleted from the cache (lazy deletion).

    Args:
        key (str): The cache key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached Python object, or default if not found or expired.

    Notes:
        - Uses the asynchronous connection pool.
        - The value is deserialized using pickle.
        - Expired entries are removed on access.
    """
    await self._ensure_async_pool_open()
    async with self._async_pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                f"SELECT value, expire_at FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            row = await cur.fetchone()
            if not row:
                return default
            value, expire_at = row
            if self._is_expired(expire_at):
                await self.adelete(key)  # Lazy delete
                return default
            return pickle.loads(value)

adelete async

adelete(key)

Asynchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Uses the asynchronous connection pool.
  • The key is automatically namespaced.
Source code in fast_cache/backends/postgres.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Uses the asynchronous connection pool.
        - The key is automatically namespaced.
    """
    await self._ensure_async_pool_open()
    async with self._async_pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                f"DELETE FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            await conn.commit()

ahas async

ahas(key)

Asynchronously checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Uses the asynchronous connection pool.
  • Expired entries are not considered present.
  • Does not remove expired entries; use aget for lazy deletion.
Source code in fast_cache/backends/postgres.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Uses the asynchronous connection pool.
        - Expired entries are not considered present.
        - Does not remove expired entries; use `aget` for lazy deletion.
    """
    await self._ensure_async_pool_open()
    async with self._async_pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                f"SELECT expire_at FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            row = await cur.fetchone()
            if not row:
                return False
            return not self._is_expired(row[0])

aclear async

aclear()

Asynchronously removes all cache entries in the current namespace.

This method deletes all rows from the cache table whose keys match the current namespace prefix.

Notes
  • Uses the asynchronous connection pool.
  • Only entries in the current namespace are affected.
  • This operation can be expensive if the cache is large.
Source code in fast_cache/backends/postgres.py
async def aclear(self) -> None:
    """
    Asynchronously removes all cache entries in the current namespace.

    This method deletes all rows from the cache table whose keys match the
    current namespace prefix.

    Notes:
        - Uses the asynchronous connection pool.
        - Only entries in the current namespace are affected.
        - This operation can be expensive if the cache is large.
    """
    await self._ensure_async_pool_open()
    async with self._async_pool.connection() as conn:
        async with conn.cursor() as cur:
            # FIX: Use the dynamic table name
            await cur.execute(
                f"DELETE FROM {self._table_name} WHERE key LIKE %s;",
                (self._make_key("%"),),
            )
            await conn.commit()

aclose async

aclose()

Asynchronously closes the connection pools and stops the cleanup scheduler.

This method should be called when the backend is no longer needed to ensure all resources are released and background jobs are stopped.

Notes
  • Closes both synchronous and asynchronous connection pools.
  • Stops the background cleanup scheduler.
Source code in fast_cache/backends/postgres.py
async def aclose(self) -> None:
    """
    Asynchronously closes the connection pools and stops the cleanup scheduler.

    This method should be called when the backend is no longer needed to ensure
    all resources are released and background jobs are stopped.

    Notes:
        - Closes both synchronous and asynchronous connection pools.
        - Stops the background cleanup scheduler.
    """
    self._stop_cleanup_scheduler()
    if self._sync_pool:
        self._sync_pool.close()

    if self._async_pool:
        await self._async_pool.close()

close

close()

Closes the synchronous connection pool and stops the cleanup scheduler.

This method should be called when the backend is no longer needed to ensure all resources are released and background jobs are stopped.

Notes
  • Only closes the synchronous connection pool.
  • Stops the background cleanup scheduler.
Source code in fast_cache/backends/postgres.py
def close(self) -> None:
    """
    Closes the synchronous connection pool and stops the cleanup scheduler.

    This method should be called when the backend is no longer needed to ensure
    all resources are released and background jobs are stopped.

    Notes:
        - Only closes the synchronous connection pool.
        - Stops the background cleanup scheduler.
    """
    self._stop_cleanup_scheduler()
    if self._sync_pool:
        self._sync_pool.close()

fast_cache.MemcachedBackend

MemcachedBackend(host, port, *, pool_size=10, pool_minsize=1, namespace='fastapi_cache')

Bases: CacheBackend


              flowchart TD
              fast_cache.MemcachedBackend[MemcachedBackend]
              fast_cache.backends.backend.CacheBackend[CacheBackend]

                              fast_cache.backends.backend.CacheBackend --> fast_cache.MemcachedBackend
                


              click fast_cache.MemcachedBackend href "" "fast_cache.MemcachedBackend"
              click fast_cache.backends.backend.CacheBackend href "" "fast_cache.backends.backend.CacheBackend"
            

Initializes a new instance of the MemcachedBackend cache.

This backend provides a cache using Memcached as the storage layer. It supports both synchronous and asynchronous operations, and uses a namespace prefix for all keys to avoid collisions.

Parameters:

Name Type Description Default
host str

The hostname or IP address of the Memcached server.

required
port int

The port number of the Memcached server.

required
pool_size int

The maximum number of connections in the async pool. Defaults to 10.

10
pool_minsize int

The minimum number of connections in the async pool. Defaults to 1.

1
namespace str

Prefix for all cache keys. Defaults to "fastapi_cache".

'fastapi_cache'

Raises:

Type Description
ImportError

If the required aiomcache or pymemcache packages are not installed.

Notes
  • Both synchronous and asynchronous Memcached clients are initialized.
  • The async client is created per event loop.
  • All cache keys are automatically namespaced.
Source code in fast_cache/backends/memcached.py
def __init__(
    self,
    host: str,
    port: int,
    *,
    pool_size: int = 10,
    pool_minsize: int = 1,
    namespace: str = "fastapi_cache",
) -> None:
    try:
        import aiomcache
        from pymemcache.client.base import PooledClient
    except ImportError:
        raise ImportError(
            "MemcachedBackend requires 'aiomcache' and 'pymemcache'. "
            "Install with: pip install fast-cache[memcached]"
        )
    self._namespace = namespace
    self._host = host
    self._port = port

    # Sync client
    self._sync_client = PooledClient(
        (host, port),
        max_pool_size=10,
    )
    self._async_client = aiomcache.Client(
        host,
        port,
        pool_size=pool_size,
        pool_minsize=pool_minsize,
    )

get

get(key, default=None)

Synchronously retrieves a value from the cache by key.

If the key does not exist, returns default.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached Python object, or default if not found.

Notes
  • The value is deserialized using pickle.
  • Handles deserialization errors gracefully.
  • Thread-safe for Memcached client.
Source code in fast_cache/backends/memcached.py
def get(self, key: str, default: Any = None) -> Any:
    """
    Synchronously retrieves a value from the cache by key.

    If the key does not exist, returns default.

    Args:
        key (str): The cache key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached Python object, or default if not found.

    Notes:
        - The value is deserialized using pickle.
        - Handles deserialization errors gracefully.
        - Thread-safe for Memcached client.
    """
    try:
        value = self._sync_client.get(self._make_key(key))
        return pickle.loads(value) if value else default
    except Exception as e:
        logger.warning("Cache get failed: %s", e)
        return default

set

set(key, value, expire=None)

Synchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • The value is serialized using pickle.
  • Thread-safe for Memcached client.
  • Expiration is handled by Memcached.
Source code in fast_cache/backends/memcached.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - The value is serialized using pickle.
        - Thread-safe for Memcached client.
        - Expiration is handled by Memcached.
    """
    try:
        exptime = (
            int(expire.total_seconds())
            if isinstance(expire, timedelta)
            else (expire or 0)
        )
        self._sync_client.set(
            self._make_key(key), pickle.dumps(value), expire=exptime
        )
    except Exception as e:
        logger.warning("Cache set failed: %s", e)

delete

delete(key)

Synchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Thread-safe for Memcached client.
  • The key is automatically namespaced.
Source code in fast_cache/backends/memcached.py
def delete(self, key: str) -> None:
    """
    Synchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Thread-safe for Memcached client.
        - The key is automatically namespaced.
    """
    try:
        self._sync_client.delete(self._make_key(key))
    except Exception as e:
        logger.warning("Cache delete failed: %s", e)

clear

clear()

Synchronously removes all cache entries from Memcached.

Memcached does not support namespace-based clearing, so this operation flushes the entire cache, removing all entries regardless of namespace.

Notes
  • Thread-safe for Memcached client.
  • This operation affects all keys in the Memcached instance.
  • Use with caution in shared environments.
Source code in fast_cache/backends/memcached.py
def clear(self) -> None:
    """
    Synchronously removes all cache entries from Memcached.

    Memcached does not support namespace-based clearing, so this operation flushes
    the entire cache, removing all entries regardless of namespace.

    Notes:
        - Thread-safe for Memcached client.
        - This operation affects all keys in the Memcached instance.
        - Use with caution in shared environments.
    """

    try:
        self._sync_client.flush_all()
    except Exception as e:
        logger.warning("Cache clear failed: %s", e)

has

has(key)

Synchronously checks if a cache key exists.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Notes
  • Thread-safe for Memcached client.
  • Expired entries are not considered present.
Source code in fast_cache/backends/memcached.py
def has(self, key: str) -> bool:
    """
    Synchronously checks if a cache key exists.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists, False otherwise.

    Notes:
        - Thread-safe for Memcached client.
        - Expired entries are not considered present.
    """
    try:
        return self._sync_client.get(self._make_key(key)) is not None
    except Exception as e:
        logger.warning("Cache has failed: %s", e)
        return False

aget async

aget(key, default=None)

Asynchronously retrieves a value from the cache by key.

If the key does not exist, returns default.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached Python object, or default if not found.

Notes
  • The value is deserialized using pickle.
  • Handles deserialization errors gracefully.
  • Asyncio-safe for Memcached client.
Source code in fast_cache/backends/memcached.py
async def aget(self, key: str, default: Any = None) -> Any:
    """
    Asynchronously retrieves a value from the cache by key.

    If the key does not exist, returns default.

    Args:
        key (str): The cache key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached Python object, or default if not found.

    Notes:
        - The value is deserialized using pickle.
        - Handles deserialization errors gracefully.
        - Asyncio-safe for Memcached client.
    """
    try:
        value = await self._async_client.get(self._make_key(key))
        return pickle.loads(value) if value else default
    except Exception as e:
        logger.warning("Cache aget failed: %s", e)
        return default

aset async

aset(key, value, expire=None)

Asynchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • The value is serialized using pickle.
  • Asyncio-safe for Memcached client.
  • Expiration is handled by Memcached.
Source code in fast_cache/backends/memcached.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - The value is serialized using pickle.
        - Asyncio-safe for Memcached client.
        - Expiration is handled by Memcached.
    """
    try:
        exptime = (
            int(expire.total_seconds())
            if isinstance(expire, timedelta)
            else (expire or 0)
        )
        await self._async_client.set(
            self._make_key(key), pickle.dumps(value), exptime=exptime
        )
    except Exception as e:
        logger.warning("Cache aset failed: %s", e)

adelete async

adelete(key)

Asynchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Asyncio-safe for Memcached client.
  • The key is automatically namespaced.
Source code in fast_cache/backends/memcached.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Asyncio-safe for Memcached client.
        - The key is automatically namespaced.
    """
    try:
        await self._async_client.delete(self._make_key(key))
    except Exception as e:
        logger.warning("Cache adelete failed: %s", e)

aclear async

aclear()

Asynchronously removes all cache entries from Memcached.

Memcached does not support namespace-based clearing, so this operation flushes the entire cache, removing all entries regardless of namespace.

Notes
  • Asyncio-safe for Memcached client.
  • This operation affects all keys in the Memcached instance.
  • Use with caution in shared environments.
Source code in fast_cache/backends/memcached.py
async def aclear(self) -> None:
    """
    Asynchronously removes all cache entries from Memcached.

    Memcached does not support namespace-based clearing, so this operation flushes
    the entire cache, removing all entries regardless of namespace.

    Notes:
        - Asyncio-safe for Memcached client.
        - This operation affects all keys in the Memcached instance.
        - Use with caution in shared environments.
    """
    try:
        await self._async_client.flush_all()
    except Exception as e:
        logger.warning("Cache aclear failed: %s", e)

ahas async

ahas(key)

Asynchronously checks if a cache key exists.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Notes
  • Asyncio-safe for Memcached client.
  • Expired entries are not considered present.
Source code in fast_cache/backends/memcached.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously checks if a cache key exists.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists, False otherwise.

    Notes:
        - Asyncio-safe for Memcached client.
        - Expired entries are not considered present.
    """
    try:
        value = await self._async_client.get(self._make_key(key))
        return value is not None
    except Exception as e:
        logger.warning("Cache ahas failed: %s", e)
        return False

close async

close()

Asynchronously closes both the async and sync Memcached clients.

This method should be called when the backend is no longer needed to ensure all resources are released.

Notes
  • After calling this method, the backend cannot be used.
  • Closes both the async and sync clients.
Source code in fast_cache/backends/memcached.py
async def close(self) -> None:
    """
    Asynchronously closes both the async and sync Memcached clients.

    This method should be called when the backend is no longer needed to ensure
    all resources are released.

    Notes:
        - After calling this method, the backend cannot be used.
        - Closes both the async and sync clients.
    """
    try:
        await self._async_client.close()
        self._sync_client.close()
    except Exception as e:
        logger.warning("Cache close failed: %s", e)

fast_cache.MongoDBBackend

MongoDBBackend(uri, namespace='fastapi_cache')

Bases: CacheBackend


              flowchart TD
              fast_cache.MongoDBBackend[MongoDBBackend]
              fast_cache.backends.backend.CacheBackend[CacheBackend]

                              fast_cache.backends.backend.CacheBackend --> fast_cache.MongoDBBackend
                


              click fast_cache.MongoDBBackend href "" "fast_cache.MongoDBBackend"
              click fast_cache.backends.backend.CacheBackend href "" "fast_cache.backends.backend.CacheBackend"
            

MongoDB cache backend with both sync and async support. Uses a TTL index for automatic expiration of cache entries.

Each cache entry is stored as a document with
  • _id: the cache key (optionally namespaced)
  • value: the pickled cached value
  • expires_at: epoch time when the entry should expire

Expired documents are deleted automatically by MongoDB's TTL monitor, but expiration is also checked in code to avoid returning stale data.

Initialize the MongoDB backend.

Parameters:

Name Type Description Default
uri str

MongoDB connection URI (should include the database name).

required
namespace Optional[str]

Optional prefix for all cache keys and the collection name. Defaults to "fastapi_cache".

'fastapi_cache'

Raises: ImportError: If pymongo is not installed.

Source code in fast_cache/backends/mongodb.py
def __init__(self, uri: str, namespace: Optional[str] = "fastapi_cache") -> None:
    """
    Initialize the MongoDB backend.

    Args:
        uri (str): MongoDB connection URI (should include the database name).
        namespace (Optional[str]): Optional prefix for all cache keys and the collection name.
                                   Defaults to "fastapi_cache".
    Raises:
        ImportError: If pymongo is not installed.
    """
    try:
        import pymongo
    except ImportError:
        raise ImportError(
            "MongoDBBackend requires 'pymongo>=4.6.0'. "
            "Install with: pip install fastapi-cachekit[mongodb]"
        )
    self._namespace = namespace or "cache"

    self._sync_client = pymongo.MongoClient(uri)
    self._sync_db = self._sync_client.get_default_database()
    self._sync_collection = self._sync_db[self._namespace]
    self._sync_collection.create_index("expires_at", expireAfterSeconds=0)

    # Async client
    self._async_client = pymongo.AsyncMongoClient(uri)
    self._async_db = self._async_client.get_default_database()
    self._async_collection = self._async_db[self._namespace]

get

get(key, default=None)

Synchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached value, or default if not found or expired.

Source code in fast_cache/backends/mongodb.py
def get(self, key: str, default: Any = None) -> Any:
    """
    Synchronously retrieve a value from the cache.

    Args:
        key (str): The cache key.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached value, or default if not found or expired.
    """
    doc = self._sync_collection.find_one({"_id": self._make_key(key)})
    if doc and (doc.get("expires_at", float("inf")) > time.time()):
        try:
            return pickle.loads(doc["value"])
        except Exception:
            return default
    return default

set

set(key, value, expire=None)

Synchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required
value Any

The value to cache.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta. If None, the entry never expires.

None
Source code in fast_cache/backends/mongodb.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously set a value in the cache.

    Args:
        key (str): The cache key.
        value (Any): The value to cache.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
                                                 If None, the entry never expires.
    """
    update = {"value": pickle.dumps(value)}
    exptime = self._compute_expire_at(expire)
    if exptime is not None:
        update["expires_at"] = exptime

    self._sync_collection.update_one(
        {"_id": self._make_key(key)}, {"$set": update}, upsert=True
    )

delete

delete(key)

Synchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required
Source code in fast_cache/backends/mongodb.py
def delete(self, key: str) -> None:
    """
    Synchronously delete a value from the cache.

    Args:
        key (str): The cache key.
    """
    self._sync_collection.delete_one({"_id": self._make_key(key)})

clear

clear()

Synchronously clear all values from the namespace.

Source code in fast_cache/backends/mongodb.py
def clear(self) -> None:
    """
    Synchronously clear all values from the namespace.
    """
    self._sync_collection.delete_many({"_id": {"$regex": f"^{self._namespace}:"}})

has

has(key)

Synchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Source code in fast_cache/backends/mongodb.py
def has(self, key: str) -> bool:
    """
    Synchronously check if a key exists in the cache.

    Args:
        key (str): The cache key.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.
    """
    doc = self._sync_collection.find_one({"_id": self._make_key(key)})
    return bool(doc and (doc.get("expires_at", float("inf")) > time.time()))

aget async

aget(key, default=None)

Asynchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached value, or default if not found or expired.

Source code in fast_cache/backends/mongodb.py
async def aget(self, key: str, default: Any = None) -> Any:
    """
    Asynchronously retrieve a value from the cache.

    Args:
        key (str): The cache key.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached value, or default if not found or expired.
    """
    doc = await self._async_collection.find_one({"_id": self._make_key(key)})
    if doc and (doc.get("expires_at", float("inf")) > time.time()):
        try:
            return pickle.loads(doc["value"])
        except Exception:
            return default
    return default

aset async

aset(key, value, expire=None)

Asynchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required
value Any

The value to cache.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta. If None, the entry never expires.

None
Source code in fast_cache/backends/mongodb.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously set a value in the cache.

    Args:
        key (str): The cache key.
        value (Any): The value to cache.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
                                                 If None, the entry never expires.
    """
    update = {"value": pickle.dumps(value)}
    exptime = self._compute_expire_at(expire)
    if exptime is not None:
        update["expires_at"] = exptime

    await self._async_collection.update_one(
        {"_id": self._make_key(key)}, {"$set": update}, upsert=True
    )

adelete async

adelete(key)

Asynchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required
Source code in fast_cache/backends/mongodb.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously delete a value from the cache.

    Args:
        key (str): The cache key.
    """
    await self._async_collection.delete_one({"_id": self._make_key(key)})

aclear async

aclear()

Asynchronously clear all values from the namespace.

Source code in fast_cache/backends/mongodb.py
async def aclear(self) -> None:
    """
    Asynchronously clear all values from the namespace.
    """
    await self._async_collection.delete_many(
        {"_id": {"$regex": f"^{self._namespace}:"}}
    )

ahas async

ahas(key)

Asynchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Source code in fast_cache/backends/mongodb.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously check if a key exists in the cache.

    Args:
        key (str): The cache key.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.
    """
    doc = await self._async_collection.find_one({"_id": self._make_key(key)})
    return bool(doc and (doc.get("expires_at", float("inf")) > time.time()))

close

close()

Close the synchronous MongoDB client.

Source code in fast_cache/backends/mongodb.py
def close(self) -> None:
    """
    Close the synchronous MongoDB client.
    """
    self._sync_client.close()

aclose async

aclose()

Close the asynchronous MongoDB client.

Source code in fast_cache/backends/mongodb.py
async def aclose(self) -> None:
    """
    Close the asynchronous MongoDB client.
    """
    self._sync_client.close()
    await self._async_client.close()

fast_cache.FirestoreBackend

FirestoreBackend(credential_path=None, namespace='fastapi_cache', collection_name='cache_entries', cleanup_interval=30, auto_cleanup=True)

Bases: CacheBackend


              flowchart TD
              fast_cache.FirestoreBackend[FirestoreBackend]
              fast_cache.backends.backend.CacheBackend[CacheBackend]

                              fast_cache.backends.backend.CacheBackend --> fast_cache.FirestoreBackend
                


              click fast_cache.FirestoreBackend href "" "fast_cache.FirestoreBackend"
              click fast_cache.backends.backend.CacheBackend href "" "fast_cache.backends.backend.CacheBackend"
            

Initializes a new instance of the FirestoreBackend cache.

This backend provides a cache using Google Cloud Firestore as the storage layer. It supports both synchronous and asynchronous operations, manual expiration management, and optional periodic cleanup of expired entries.

Parameters:

Name Type Description Default
credential_path Optional[str]

Path to the Firebase Admin SDK credentials file. If None, uses the GOOGLE_APPLICATION_CREDENTIALS environment variable. Defaults to None.

None
namespace Optional[str]

Optional prefix for all cache keys. Defaults to "fastapi_cache".

'fastapi_cache'
collection_name Optional[str]

Name of the Firestore collection to use for storing cache entries. Defaults to "cache_entries".

'cache_entries'
cleanup_interval int

Interval in seconds for periodic cleanup of expired entries. Defaults to 30.

30
auto_cleanup bool

Whether to automatically start the cleanup scheduler on initialization. Defaults to True.

True

Raises:

Type Description
ImportError

If the required google-cloud-firestore package is not installed.

Notes
  • The backend uses a hashed, namespaced key for each Firestore document.
  • Expired entries are managed via a custom expires_at field.
  • Both synchronous and asynchronous Firestore clients are initialized.
  • The cleanup scheduler can be started or stopped manually.
Source code in fast_cache/backends/google_firestore.py
def __init__(
    self,
    credential_path: Optional[str] = None,
    namespace: Optional[str] = "fastapi_cache",
    collection_name: Optional[str] = "cache_entries",
    cleanup_interval: int = 30,
    auto_cleanup: bool = True,
) -> None:
    try:
        from google.oauth2 import service_account
        from google.cloud import firestore
        from google.cloud.firestore_v1.async_client import AsyncClient
        from google.cloud.firestore_v1.client import Client
    except ImportError:
        raise ImportError(
            "FirestoreBackend requires 'google-cloud-firestore'. "
            "Install with: pip install fastapi-cachekit[firestore]"
        )

    self._namespace = namespace or "cache"
    self._collection_name = collection_name or "cache_entries"

    self._cleanup_task = None
    self._cleanup_interval = cleanup_interval
    self._auto_cleanup = auto_cleanup

    self._scheduler = None
    self._scheduler_lock = threading.Lock()

    if credential_path:
        # Explicitly load credentials from the provided path
        credentials = service_account.Credentials.from_service_account_file(
            credential_path
        )
        self._sync_db: Client = firestore.Client(credentials=credentials)
        self._async_db: AsyncClient = firestore.AsyncClient(credentials=credentials)
    else:
        # Rely on GOOGLE_APPLICATION_CREDENTIALS
        self._sync_db: Client = firestore.Client()
        self._async_db: AsyncClient = firestore.AsyncClient()

    if self._auto_cleanup:
        self._start_cleanup_scheduler()

get

get(key, default=None)

Synchronously retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns default. If the entry is expired, it is not automatically deleted.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached Python object, or default if not found or expired.

Notes
  • The value is deserialized using pickle.
  • Handles deserialization errors gracefully.
  • Thread-safe for Firestore client.
Source code in fast_cache/backends/google_firestore.py
def get(self, key: str, default: Any = None) -> Any:
    """
    Synchronously retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns default. If the
    entry is expired, it is not automatically deleted.

    Args:
        key (str): The cache key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached Python object, or default if not found or expired.

    Notes:
        - The value is deserialized using pickle.
        - Handles deserialization errors gracefully.
        - Thread-safe for Firestore client.
    """
    doc_ref = self._sync_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    doc = doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        if not self._is_expired(data.get("expires_at")):
            try:
                return pickle.loads(data["value"])
            except (pickle.UnpicklingError, KeyError):
                return default
    return default

set

set(key, value, expire=None)

Synchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • The value is serialized using pickle.
  • Thread-safe for Firestore client.
Source code in fast_cache/backends/google_firestore.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - The value is serialized using pickle.
        - Thread-safe for Firestore client.
    """
    doc_ref = self._sync_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    data = {"value": pickle.dumps(value)}
    exptime = self._compute_expire_at(expire)
    if exptime is not None:
        data["expires_at"] = exptime

    doc_ref.set(data)

delete

delete(key)

Synchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Thread-safe for Firestore client.
  • The key is automatically namespaced and hashed.
Source code in fast_cache/backends/google_firestore.py
def delete(self, key: str) -> None:
    """
    Synchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Thread-safe for Firestore client.
        - The key is automatically namespaced and hashed.
    """
    doc_ref = self._sync_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    doc_ref.delete()

clear

clear()

Synchronously removes all cache entries in the collection.

This method deletes all documents in the configured Firestore collection. Note that Firestore does not support direct namespace-based clearing, so all entries in the collection are removed.

Notes
  • Thread-safe for Firestore client.
  • This operation can be expensive if the collection is large.
  • For more granular clearing, consider adding a namespace field to documents.
Source code in fast_cache/backends/google_firestore.py
def clear(self) -> None:
    """
    Synchronously removes all cache entries in the collection.

    This method deletes all documents in the configured Firestore collection.
    Note that Firestore does not support direct namespace-based clearing, so
    all entries in the collection are removed.

    Notes:
        - Thread-safe for Firestore client.
        - This operation can be expensive if the collection is large.
        - For more granular clearing, consider adding a namespace field to documents.
    """
    docs = self._sync_db.collection(self._collection_name).stream()
    for doc in docs:
        doc.reference.delete()

has

has(key)

Synchronously checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Thread-safe for Firestore client.
  • Expired entries are not considered present.
Source code in fast_cache/backends/google_firestore.py
def has(self, key: str) -> bool:
    """
    Synchronously checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Thread-safe for Firestore client.
        - Expired entries are not considered present.
    """
    doc_ref = self._sync_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    doc = doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        return not self._is_expired(data.get("expires_at"))
    return False

aget async

aget(key, default=None)

Asynchronously retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns default. If the entry is expired, it is not automatically deleted.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached Python object, or default if not found or expired.

Notes
  • The value is deserialized using pickle.
  • Handles deserialization errors gracefully.
  • Asyncio-safe for Firestore client.
Source code in fast_cache/backends/google_firestore.py
async def aget(self, key: str, default: Any = None) -> Any:
    """
    Asynchronously retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns default. If the
    entry is expired, it is not automatically deleted.

    Args:
        key (str): The cache key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached Python object, or default if not found or expired.

    Notes:
        - The value is deserialized using pickle.
        - Handles deserialization errors gracefully.
        - Asyncio-safe for Firestore client.
    """
    doc_ref = self._async_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    doc = await doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        if not self._is_expired(data.get("expires_at")):
            try:
                return pickle.loads(data["value"])
            except (pickle.UnpicklingError, KeyError):
                return default
    return default

aset async

aset(key, value, expire=None)

Asynchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • The value is serialized using pickle.
  • Asyncio-safe for Firestore client.
Source code in fast_cache/backends/google_firestore.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - The value is serialized using pickle.
        - Asyncio-safe for Firestore client.
    """
    doc_ref = self._async_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    data = {"value": pickle.dumps(value)}
    exptime = self._compute_expire_at(expire)

    if expire is not None:
        data["expires_at"] = exptime

    await doc_ref.set(data)

adelete async

adelete(key)

Asynchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Asyncio-safe for Firestore client.
  • The key is automatically namespaced and hashed.
Source code in fast_cache/backends/google_firestore.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Asyncio-safe for Firestore client.
        - The key is automatically namespaced and hashed.
    """
    doc_ref = self._async_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    await doc_ref.delete()

aclear async

aclear()

Asynchronously removes all cache entries in the collection.

This method deletes all documents in the configured Firestore collection. Note that Firestore does not support direct namespace-based clearing, so all entries in the collection are removed.

Notes
  • Asyncio-safe for Firestore client.
  • This operation can be expensive if the collection is large.
  • For more granular clearing, consider adding a namespace field to documents.
Source code in fast_cache/backends/google_firestore.py
async def aclear(self) -> None:
    """
    Asynchronously removes all cache entries in the collection.

    This method deletes all documents in the configured Firestore collection.
    Note that Firestore does not support direct namespace-based clearing, so
    all entries in the collection are removed.

    Notes:
        - Asyncio-safe for Firestore client.
        - This operation can be expensive if the collection is large.
        - For more granular clearing, consider adding a namespace field to documents.
    """
    docs = self._async_db.collection(self._collection_name).stream()
    async for doc in docs:
        await doc.reference.delete()

ahas async

ahas(key)

Asynchronously checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Asyncio-safe for Firestore client.
  • Expired entries are not considered present.
Source code in fast_cache/backends/google_firestore.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Asyncio-safe for Firestore client.
        - Expired entries are not considered present.
    """
    doc_ref = self._async_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    doc = await doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        return not self._is_expired(data.get("expires_at"))
    return False

close

close()

Closes the synchronous Firestore client and stops the cleanup scheduler.

This method should be called when the backend is no longer needed to ensure all resources are released and background jobs are stopped.

Notes
  • After calling this method, the synchronous client is closed and cannot be used.
  • The background cleanup scheduler is stopped.
Source code in fast_cache/backends/google_firestore.py
def close(self) -> None:
    """
    Closes the synchronous Firestore client and stops the cleanup scheduler.

    This method should be called when the backend is no longer needed to ensure
    all resources are released and background jobs are stopped.

    Notes:
        - After calling this method, the synchronous client is closed and cannot be used.
        - The background cleanup scheduler is stopped.
    """
    self._stop_cleanup_scheduler()
    try:
        self._sync_db.close()
    except TypeError:
        return

aclose async

aclose()

Closes the asynchronous Firestore client and stops the cleanup scheduler.

This method should be called when the backend is no longer needed to ensure all resources are released and background jobs are stopped.

Notes
  • After calling this method, the asynchronous client is closed and cannot be used.
  • The background cleanup scheduler is stopped.
Source code in fast_cache/backends/google_firestore.py
async def aclose(self) -> None:
    """
    Closes the asynchronous Firestore client and stops the cleanup scheduler.

    This method should be called when the backend is no longer needed to ensure
    all resources are released and background jobs are stopped.

    Notes:
        - After calling this method, the asynchronous client is closed and cannot be used.
        - The background cleanup scheduler is stopped.
    """
    self._stop_cleanup_scheduler()
    try:
        await self._async_db.close()
    except TypeError:
        return

fast_cache.DynamoDBBackend

DynamoDBBackend(table_name, region_name, namespace='cache', aws_access_key_id=None, aws_secret_access_key=None, endpoint_url=None, create_table=True)

Bases: CacheBackend


              flowchart TD
              fast_cache.DynamoDBBackend[DynamoDBBackend]
              fast_cache.backends.backend.CacheBackend[CacheBackend]

                              fast_cache.backends.backend.CacheBackend --> fast_cache.DynamoDBBackend
                


              click fast_cache.DynamoDBBackend href "" "fast_cache.DynamoDBBackend"
              click fast_cache.backends.backend.CacheBackend href "" "fast_cache.backends.backend.CacheBackend"
            

DynamoDB cache backend implementation with namespace support.

Attributes:

Name Type Description
_namespace str

Namespace prefix for all keys.

_table_name str

DynamoDB table name.

_sync_client client

Synchronous DynamoDB client.

_async_client client

Asynchronous DynamoDB client.

_sync_resource resource

Synchronous DynamoDB resource.

_async_resource resource

Asynchronous DynamoDB resource.

Initialize DynamoDB backend with table and connection settings.

Parameters:

Name Type Description Default
table_name str

DynamoDB table name for cache storage.

required
namespace str

Namespace prefix for all keys (default: "fastapi-cache").

'cache'
region_name str

AWS region name (default: "us-east-1").

required
aws_access_key_id Optional[str]

AWS access key ID.

None
aws_secret_access_key Optional[str]

AWS secret access key.

None
endpoint_url Optional[str]

Custom endpoint URL (for local DynamoDB).

None
create_table bool

Whether to create table if it doesn't exist.

True
Source code in fast_cache/backends/dynamodb.py
def __init__(
    self,
    table_name: str,
    region_name: str,
    namespace: str = "cache",
    aws_access_key_id: Optional[str] = None,
    aws_secret_access_key: Optional[str] = None,
    endpoint_url: Optional[str] = None,
    create_table: bool = True,
) -> None:
    """
    Initialize DynamoDB backend with table and connection settings.

    Args:
        table_name (str): DynamoDB table name for cache storage.
        namespace (str): Namespace prefix for all keys (default: "fastapi-cache").
        region_name (str): AWS region name (default: "us-east-1").
        aws_access_key_id (Optional[str]): AWS access key ID.
        aws_secret_access_key (Optional[str]): AWS secret access key.
        endpoint_url (Optional[str]): Custom endpoint URL (for local DynamoDB).
        create_table (bool): Whether to create table if it doesn't exist.
    """
    try:
        import boto3
        import aioboto3
    except ImportError:
        raise ImportError(
            "DynamoDBBackend requires the 'boto3' and 'aioboto3' packages. "
            "Install them with: pip install fast-cache[dynamodb]"
        )

    self._namespace = namespace
    self._table_name = table_name

    # Connection parameters
    self._connection_params = {
        "region_name": region_name,
        "endpoint_url": endpoint_url,
    }

    if aws_access_key_id and aws_secret_access_key:
        self._connection_params.update(
            {
                "aws_access_key_id": aws_access_key_id,
                "aws_secret_access_key": aws_secret_access_key,
            }
        )

    # Sync client for table management only
    self._sync_client = boto3.client("dynamodb", **self._connection_params)

    # Sync resource/table for sync cache operations
    self._sync_resource = boto3.resource("dynamodb", **self._connection_params)
    self._sync_table = self._sync_resource.Table(table_name)

    # Initialize async session
    self._async_resource = None
    self._async_table = None
    self._async_session = aioboto3.Session()
    self._async_table_lock = asyncio.Lock()

    # Create table if requested
    if create_table:
        self._ensure_table_exists()

get

get(key, default=None)

Synchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached value, or default if not found.

Source code in fast_cache/backends/dynamodb.py
def get(self, key: str, default: Any = None) -> Any:
    """
    Synchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached value, or default if not found.
    """
    try:
        response = self._sync_table.get_item(Key={"cache_key": self._make_key(key)})

        if "Item" not in response:
            return default

        item = response["Item"]

        # Check if item has expired and delete if so
        if self._is_expired(item):
            self.delete(key)
            return default
        value = self._deserialize_value(item["value"])
        return value
    except Exception as e:
        logger.warning("Cache get failed: %s", e)
        return default

aget async

aget(key, default=None)

Asynchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached value, or default if not found.

Source code in fast_cache/backends/dynamodb.py
async def aget(self, key: str, default: Any = None) -> Any:
    """
    Asynchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached value, or default if not found.
    """
    try:
        table = await self._get_async_table()
        response = await table.get_item(Key={"cache_key": self._make_key(key)})

        if "Item" not in response:
            return default

        item = response["Item"]

        # Check if item has expired and delete if so
        if self._is_expired(item):
            await self.adelete(key)
            return default

        return self._deserialize_value(item["value"])
    except Exception as e:
        logger.warning("Cache aget failed: %s", e)
        return default

set

set(key, value, expire=None)

Synchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/dynamodb.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    try:
        item = self._build_item(key, value, expire)
        self._sync_table.put_item(Item=item)
    except Exception as e:
        logger.warning("Cache set failed: %s", e)

aset async

aset(key, value, expire=None)

Asynchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/dynamodb.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    try:
        table = await self._get_async_table()
        item = self._build_item(key, value, expire)
        await table.put_item(Item=item)
    except Exception as e:
        logger.warning("Cache aset failed: %s", e)

delete

delete(key)

Synchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/dynamodb.py
def delete(self, key: str) -> None:
    """
    Synchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    try:
        self._sync_table.delete_item(Key={"cache_key": self._make_key(key)})
    except Exception as e:
        logger.warning("Cache delete failed: %s", e)

adelete async

adelete(key)

Asynchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/dynamodb.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    try:
        table = await self._get_async_table()
        await table.delete_item(Key={"cache_key": self._make_key(key)})
    except Exception as e:
        logger.warning("Cache adelete failed: %s", e)

has

has(key)

Synchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/dynamodb.py
def has(self, key: str) -> bool:
    """
    Synchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    try:
        response = self._sync_table.get_item(
            Key={"cache_key": self._make_key(key)},
            ProjectionExpression="cache_key, #ttl",
            ExpressionAttributeNames={"#ttl": "ttl"},
        )

        if "Item" not in response:
            return False

        item = response["Item"]

        # Check if item has expired and delete if so
        if self._is_expired(item):
            self.delete(key)
            return False

        return True
    except Exception as e:
        logger.warning("Cache has failed: %s", e)
        return False

ahas async

ahas(key)

Asynchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/dynamodb.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    try:
        table = await self._get_async_table()
        response = await table.get_item(
            Key={"cache_key": self._make_key(key)},
            ProjectionExpression="cache_key, #ttl",
            ExpressionAttributeNames={"#ttl": "ttl"},
        )

        if "Item" not in response:
            return False

        item = response["Item"]

        # Check if item has expired and delete if so
        if self._is_expired(item):
            await self.adelete(key)
            return False

        return True
    except Exception as e:
        logger.warning("Cache ahas failed: %s", e)
        return False

clear

clear()

Synchronously clear all values from the namespace.

Source code in fast_cache/backends/dynamodb.py
def clear(self) -> None:
    """
    Synchronously clear all values from the namespace.
    """
    try:
        # Scan for all items with the namespace prefix
        response = self._sync_table.scan(
            FilterExpression="begins_with(cache_key, :prefix)",
            ExpressionAttributeValues={":prefix": f"{self._namespace}:"},
            ProjectionExpression="cache_key",
        )

        # Delete items in batches
        if response.get("Items"):
            with self._sync_table.batch_writer() as batch:
                for item in response["Items"]:
                    batch.delete_item(Key={"cache_key": item["cache_key"]})

        # Handle pagination
        while "LastEvaluatedKey" in response:
            response = self._sync_table.scan(
                FilterExpression="begins_with(cache_key, :prefix)",
                ExpressionAttributeValues={":prefix": f"{self._namespace}:"},
                ProjectionExpression="cache_key",
                ExclusiveStartKey=response["LastEvaluatedKey"],
            )

            if response.get("Items"):
                with self._sync_table.batch_writer() as batch:
                    for item in response["Items"]:
                        batch.delete_item(Key={"cache_key": item["cache_key"]})

    except Exception as e:
        logger.warning("Cache clear failed: %s", e)

aclear async

aclear()

Asynchronously clear all values from the namespace.

Source code in fast_cache/backends/dynamodb.py
async def aclear(self) -> None:
    """
    Asynchronously clear all values from the namespace.
    """
    try:
        table = await self._get_async_table()

        # Scan for all items with the namespace prefix
        response = await table.scan(
            FilterExpression="begins_with(cache_key, :prefix)",
            ExpressionAttributeValues={":prefix": f"{self._namespace}:"},
            ProjectionExpression="cache_key",
        )

        # Delete items in batches
        if response.get("Items"):
            async with table.batch_writer() as batch:
                for item in response["Items"]:
                    await batch.delete_item(Key={"cache_key": item["cache_key"]})

        # Handle pagination
        while "LastEvaluatedKey" in response:
            response = await table.scan(
                FilterExpression="begins_with(cache_key, :prefix)",
                ExpressionAttributeValues={":prefix": f"{self._namespace}:"},
                ProjectionExpression="cache_key",
                ExclusiveStartKey=response["LastEvaluatedKey"],
            )

            if response.get("Items"):
                async with table.batch_writer() as batch:
                    for item in response["Items"]:
                        await batch.delete_item(
                            Key={"cache_key": item["cache_key"]}
                        )

    except Exception as e:
        logger.warning("Cache aclear failed: %s", e)

close async

close()

Close DynamoDB connections and clean up resources.

Source code in fast_cache/backends/dynamodb.py
async def close(self) -> None:
    """
    Close DynamoDB connections and clean up resources.
    """
    if self._async_resource:
        await self._async_resource.__aexit__(None, None, None)
        self._async_resource = None
        self._async_table = None

Backend Base Class

fast_cache.backends.backend.CacheBackend

Bases: ABC


              flowchart TD
              fast_cache.backends.backend.CacheBackend[CacheBackend]

              

              click fast_cache.backends.backend.CacheBackend href "" "fast_cache.backends.backend.CacheBackend"
            

Abstract base class for cache backends.

All cache backend implementations must inherit from this class and implement both synchronous and asynchronous methods for cache operations.

aget abstractmethod async

aget(key, default=None)

Asynchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached value, or default if not found.

Source code in fast_cache/backends/backend.py
@abstractmethod
async def aget(self, key: str, default: Any = None) -> Any:
    """
    Asynchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached value, or default if not found.
    """
    pass

get abstractmethod

get(key, default=None)

Synchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required
default Any

Value to return if key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The cached value, or default if not found.

Source code in fast_cache/backends/backend.py
@abstractmethod
def get(self, key: str, default: Any = None) -> Any:
    """
    Synchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.
        default (Any): Value to return if key is not found. Defaults to None.

    Returns:
        Any: The cached value, or default if not found.
    """
    pass

aset abstractmethod async

aset(key, value, expire=None)

Asynchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/backend.py
@abstractmethod
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    pass

set abstractmethod

set(key, value, expire=None)

Synchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/backend.py
@abstractmethod
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    pass

adelete abstractmethod async

adelete(key)

Asynchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/backend.py
@abstractmethod
async def adelete(self, key: str) -> None:
    """
    Asynchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    pass

delete abstractmethod

delete(key)

Synchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/backend.py
@abstractmethod
def delete(self, key: str) -> None:
    """
    Synchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    pass

aclear abstractmethod async

aclear()

Asynchronously clear all values from the cache.

Source code in fast_cache/backends/backend.py
@abstractmethod
async def aclear(self) -> None:
    """
    Asynchronously clear all values from the cache.
    """
    pass

clear abstractmethod

clear()

Synchronously clear all values from the cache.

Source code in fast_cache/backends/backend.py
@abstractmethod
def clear(self) -> None:
    """
    Synchronously clear all values from the cache.
    """
    pass

ahas abstractmethod async

ahas(key)

Asynchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/backend.py
@abstractmethod
async def ahas(self, key: str) -> bool:
    """
    Asynchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    pass

has abstractmethod

has(key)

Synchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/backend.py
@abstractmethod
def has(self, key: str) -> bool:
    """
    Synchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    pass