Skip to content

API Reference

Main Cache Interface

fast_cache.FastAPICache

FastAPICache()

FastAPI Cache Extension.

This class provides caching utilities for FastAPI applications, including decorator-based caching and dependency-injection-based backend access.

Initialize the FastAPICache instance.

Source code in fast_cache/integration.py
def __init__(self) -> None:
    """
    Initialize the FastAPICache instance.
    """
    self._backend: Optional[CacheBackend] = None
    self._app: Optional[FastAPI] = None
    self._default_expire: Optional[Union[int, timedelta]] = None

get_cache

get_cache()

Get the configured cache backend for dependency injection.

Returns:

Name Type Description
CacheBackend CacheBackend

The configured cache backend instance.

Raises:

Type Description
RuntimeError

If the cache is not initialized.

Source code in fast_cache/integration.py
def get_cache(self) -> CacheBackend:
    """
    Get the configured cache backend for dependency injection.

    Returns:
        CacheBackend: The configured cache backend instance.

    Raises:
        RuntimeError: If the cache is not initialized.
    """
    if self._backend is None:
        raise RuntimeError("Cache not initialized. Call init_app first.")
    return self._backend

cached

cached(expire=None, key_builder=None, namespace=None)

Decorator for caching function results.

Parameters:

Name Type Description Default
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as a timedelta.

None
key_builder Optional[Callable[..., str]]

Custom function to build the cache key.

None
namespace Optional[str]

Optional namespace for the cache key.

None

Returns:

Name Type Description
Callable Callable[[Callable[..., Any]], Callable[..., Any]]

A decorator that caches the function result.

Source code in fast_cache/integration.py
def cached(
    self,
    expire: Optional[Union[int, timedelta]] = None,
    key_builder: Optional[Callable[..., str]] = None,
    namespace: Optional[str] = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """
    Decorator for caching function results.

    Args:
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as a timedelta.
        key_builder (Optional[Callable[..., str]]): Custom function to build the cache key.
        namespace (Optional[str]): Optional namespace for the cache key.

    Returns:
        Callable: A decorator that caches the function result.
    """

    def decorator(func: Callable) -> Callable[..., Any]:
        """
        The actual decorator that wraps the function.

        Args:
            func (Callable): The function to be cached.

        Returns:
            Callable: The wrapped function with caching.
        """
        is_async = inspect.iscoroutinefunction(func)

        def build_cache_key(*args, **kwargs) -> str:
            """
            Build the cache key for the function call.

            Args:
                *args: Positional arguments for the function.
                **kwargs: Keyword arguments for the function.

            Returns:
                str: The generated cache key.
            """
            if key_builder is not None:
                key = key_builder(*args, **kwargs)
            else:
                # Default key building logic
                key = f"{func.__module__}:{func.__name__}:{str(args)}:{str(kwargs)}"

            if namespace:
                key = f"{namespace}:{key}"

            return key

        @wraps(func)
        async def async_wrapper(*args, **kwargs) -> Any:
            """
            Async wrapper for caching.

            Args:
                *args: Positional arguments.
                **kwargs: Keyword arguments.

            Returns:
                Any: The cached or computed result.
            """
            if not self._backend:
                return await func(*args, **kwargs)

            # Skip cache if explicitly requested
            if kwargs.pop("skip_cache", False):
                return await func(*args, **kwargs)

            cache_key = build_cache_key(*args, **kwargs)

            # Try to get from cache
            cached_value = await self._backend.aget(cache_key)
            if cached_value is not None:
                return cached_value

            # Execute function and cache result
            result = await func(*args, **kwargs)
            await self._backend.aset(
                cache_key, result, expire=expire or self._default_expire
            )
            return result

        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            """
            Sync wrapper for caching.

            Args:
                *args: Positional arguments.
                **kwargs: Keyword arguments.

            Returns:
                Any: The cached or computed result.
            """
            if not self._backend:
                return func(*args, **kwargs)

            # Skip cache if explicitly requested
            if kwargs.pop("skip_cache", False):
                return func(*args, **kwargs)

            cache_key = build_cache_key(*args, **kwargs)

            # Try to get from cache
            cached_value = self._backend.get(cache_key)
            if cached_value is not None:
                return cached_value

            # Execute function and cache result
            result = func(*args, **kwargs)
            self._backend.set(
                cache_key, result, expire=expire or self._default_expire
            )
            return result

        return async_wrapper if is_async else sync_wrapper

    return decorator

lifespan_handler async

lifespan_handler(app)

Lifespan context manager for FastAPI.

This can be used as the lifespan argument to FastAPI to manage cache lifecycle.

Parameters:

Name Type Description Default
app FastAPI

The FastAPI application instance.

required

Yields:

Type Description
AsyncIterator[None]

None

Source code in fast_cache/integration.py
@asynccontextmanager
async def lifespan_handler(self, app: FastAPI) -> AsyncIterator[None]:
    """
    Lifespan context manager for FastAPI.

    This can be used as the `lifespan` argument to FastAPI to manage
    cache lifecycle.

    Args:
        app (FastAPI): The FastAPI application instance.

    Yields:
        None
    """
    if not hasattr(app, "state"):
        app.state = {}
    app.state["cache"] = self

    try:
        yield
    finally:
        if self._backend:
            close = getattr(self._backend, "aclose", None)
            if close:
                await close()
            else:
                close = getattr(self._backend, "close", None)
                if close:
                    close()

        self._backend = None
        self._app = None

init_app

init_app(app, backend, default_expire=None)

Initialize the cache extension.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application instance.

required
backend CacheBackend

Cache backend instance.

required
default_expire Optional[Union[int, timedelta]]

Default expiration time for cached items.

None
Source code in fast_cache/integration.py
def init_app(
    self,
    app: FastAPI,
    backend: CacheBackend,
    default_expire: Optional[Union[int, timedelta]] = None,
) -> None:
    """
    Initialize the cache extension.

    Args:
        app (FastAPI): FastAPI application instance.
        backend (CacheBackend): Cache backend instance.
        default_expire (Optional[Union[int, timedelta]]): Default expiration time for cached items.
    """
    self._backend = backend
    self._app = app
    self._default_expire = default_expire

Backends

fast_cache.InMemoryBackend

InMemoryBackend(namespace='fastapi-cache', max_size=None, cleanup_interval=30)

Bases: CacheBackend

Initializes a new instance of the InMemoryBackend cache.

This backend provides an in-memory cache with optional LRU (Least Recently Used) eviction, namespace support, thread and async safety, and automatic periodic cleanup of expired items. It is suitable for single-process, multi-threaded, or asyncio-based applications.

Parameters:

Name Type Description Default
namespace str

A namespace prefix for all cache keys. This allows multiple independent caches to share the same process. Defaults to "fastapi-cache".

'fastapi-cache'
max_size Optional[int]

The maximum number of items to store in the cache. If set, the cache will evict the least recently used items when the limit is exceeded. If None, the cache size is unlimited. Defaults to None.

None
cleanup_interval int

The interval, in seconds, at which the background cleanup job runs to remove expired cache entries. Defaults to 30.

30
Notes
  • The backend uses an OrderedDict to maintain LRU order.
  • Both synchronous (thread-safe) and asynchronous (asyncio-safe) operations are supported.
  • Expired items are removed automatically by a background scheduler.
  • This backend is not suitable for multi-process or distributed environments.

Initialize the in-memory cache backend.

Parameters:

Name Type Description Default
namespace str

Namespace prefix for all keys.

'fastapi-cache'
max_size Optional[int]

Optional maximum number of items (LRU eviction if set).

None
cleanup_interval int

Interval in seconds for background cleanup.

30
Source code in fast_cache/backends/memory.py
def __init__(
    self,
    namespace: str = "fastapi-cache",
    max_size: Optional[int] = None,
    cleanup_interval: int = 30,
) -> None:
    """
    Initialize the in-memory cache backend.

    Args:
        namespace: Namespace prefix for all keys.
        max_size: Optional maximum number of items (LRU eviction if set).
        cleanup_interval: Interval in seconds for background cleanup.
    """
    self._namespace = namespace
    self._cache: OrderedDict[str, Tuple[Any, Optional[float]]] = OrderedDict()
    self._lock = threading.Lock()
    self._async_lock = asyncio.Lock()
    self._max_size = max_size
    self._cleanup_interval = cleanup_interval

    self._scheduler = None
    self._scheduler_lock = threading.Lock()
    self._start_cleanup_scheduler()

get

get(key)

Synchronously retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns None. If the entry is expired, it is deleted from the cache (lazy deletion). Accessing an item moves it to the end of the LRU order.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached Python object, or None if not found or expired.

Notes
  • Thread-safe.
  • Expired entries are removed on access.
  • Updates LRU order on access.
Source code in fast_cache/backends/memory.py
def get(self, key: str) -> Optional[Any]:
    """
    Synchronously retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns None. If the
    entry is expired, it is deleted from the cache (lazy deletion). Accessing
    an item moves it to the end of the LRU order.

    Args:
        key (str): The cache key to retrieve.

    Returns:
        Optional[Any]: The cached Python object, or None if not found or expired.

    Notes:
        - Thread-safe.
        - Expired entries are removed on access.
        - Updates LRU order on access.
    """
    k = self._make_key(key)
    with self._lock:
        item = self._cache.get(k)
        if item:
            value, expire_time = item
            if not self._is_expired(expire_time):
                self._cache.move_to_end(k)
                return value
            self._cache.pop(k, None)
        return None

set

set(key, value, expire=None)

Synchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion. Setting an item moves it to the end of the LRU order.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • Thread-safe.
  • Triggers LRU eviction if max_size is set.
  • Updates LRU order on set.
Source code in fast_cache/backends/memory.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion. Setting an item moves it to
    the end of the LRU order.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - Thread-safe.
        - Triggers LRU eviction if max_size is set.
        - Updates LRU order on set.
    """
    k = self._make_key(key)
    expire_time = self._get_expire_time(expire)
    with self._lock:
        self._cache[k] = (value, expire_time)
        self._cache.move_to_end(k)
        self._evict_if_needed()

delete

delete(key)

Synchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Thread-safe.
  • The key is automatically namespaced.
Source code in fast_cache/backends/memory.py
def delete(self, key: str) -> None:
    """
    Synchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Thread-safe.
        - The key is automatically namespaced.
    """
    k = self._make_key(key)
    with self._lock:
        self._cache.pop(k, None)

clear

clear()

Synchronously removes all cache entries in the current namespace.

This method deletes all entries whose keys match the current namespace prefix.

Notes
  • Thread-safe.
  • Only entries in the current namespace are affected.
  • This operation can be expensive if the cache is large.
Source code in fast_cache/backends/memory.py
def clear(self) -> None:
    """
    Synchronously removes all cache entries in the current namespace.

    This method deletes all entries whose keys match the current namespace prefix.

    Notes:
        - Thread-safe.
        - Only entries in the current namespace are affected.
        - This operation can be expensive if the cache is large.
    """
    prefix = f"{self._namespace}:"
    with self._lock:
        keys_to_delete = [k for k in self._cache if k.startswith(prefix)]
        for k in keys_to_delete:
            self._cache.pop(k, None)

has

has(key)

Synchronously checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Thread-safe.
  • Expired entries are not considered present and are removed on check.
  • Updates LRU order on access.
Source code in fast_cache/backends/memory.py
def has(self, key: str) -> bool:
    """
    Synchronously checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Thread-safe.
        - Expired entries are not considered present and are removed on check.
        - Updates LRU order on access.
    """
    k = self._make_key(key)
    with self._lock:
        item = self._cache.get(k)
        if item:
            _, expire_time = item
            if not self._is_expired(expire_time):
                self._cache.move_to_end(k)
                return True
            self._cache.pop(k, None)
        return False

aget async

aget(key)

Asynchronously retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns None. If the entry is expired, it is deleted from the cache (lazy deletion). Accessing an item moves it to the end of the LRU order.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached Python object, or None if not found or expired.

Notes
  • Asyncio-safe.
  • Expired entries are removed on access.
  • Updates LRU order on access.
Source code in fast_cache/backends/memory.py
async def aget(self, key: str) -> Optional[Any]:
    """
    Asynchronously retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns None. If the
    entry is expired, it is deleted from the cache (lazy deletion). Accessing
    an item moves it to the end of the LRU order.

    Args:
        key (str): The cache key to retrieve.

    Returns:
        Optional[Any]: The cached Python object, or None if not found or expired.

    Notes:
        - Asyncio-safe.
        - Expired entries are removed on access.
        - Updates LRU order on access.
    """
    k = self._make_key(key)
    async with self._async_lock:
        item = self._cache.get(k)
        if item:
            value, expire_time = item
            if not self._is_expired(expire_time):
                self._cache.move_to_end(k)
                return value
            self._cache.pop(k, None)
        return None

aset async

aset(key, value, expire=None)

Asynchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion. Setting an item moves it to the end of the LRU order.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • Asyncio-safe.
  • Triggers LRU eviction if max_size is set.
  • Updates LRU order on set.
Source code in fast_cache/backends/memory.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion. Setting an item moves it to
    the end of the LRU order.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - Asyncio-safe.
        - Triggers LRU eviction if max_size is set.
        - Updates LRU order on set.
    """
    k = self._make_key(key)
    expire_time = self._get_expire_time(expire)
    async with self._async_lock:
        self._cache[k] = (value, expire_time)
        self._cache.move_to_end(k)
        self._evict_if_needed()

adelete async

adelete(key)

Asynchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Asyncio-safe.
  • The key is automatically namespaced.
Source code in fast_cache/backends/memory.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Asyncio-safe.
        - The key is automatically namespaced.
    """
    k = self._make_key(key)
    async with self._async_lock:
        self._cache.pop(k, None)

aclear async

aclear()

Asynchronously removes all cache entries in the current namespace.

This method deletes all entries whose keys match the current namespace prefix.

Notes
  • Asyncio-safe.
  • Only entries in the current namespace are affected.
  • This operation can be expensive if the cache is large.
Source code in fast_cache/backends/memory.py
async def aclear(self) -> None:
    """
    Asynchronously removes all cache entries in the current namespace.

    This method deletes all entries whose keys match the current namespace prefix.

    Notes:
        - Asyncio-safe.
        - Only entries in the current namespace are affected.
        - This operation can be expensive if the cache is large.
    """
    prefix = f"{self._namespace}:"
    async with self._async_lock:
        keys_to_delete = [k for k in self._cache if k.startswith(prefix)]
        for k in keys_to_delete:
            self._cache.pop(k, None)

ahas async

ahas(key)

Asynchronously checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Asyncio-safe.
  • Expired entries are not considered present and are removed on check.
  • Updates LRU order on access.
Source code in fast_cache/backends/memory.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Asyncio-safe.
        - Expired entries are not considered present and are removed on check.
        - Updates LRU order on access.
    """
    k = self._make_key(key)
    async with self._async_lock:
        item = self._cache.get(k)
        if item:
            _, expire_time = item
            if not self._is_expired(expire_time):
                self._cache.move_to_end(k)
                return True
            self._cache.pop(k, None)
        return False

close

close()

Closes the backend and stops the background cleanup scheduler.

This method should be called when the backend is no longer needed to ensure all resources are released and background jobs are stopped.

Notes
  • After calling this method, the cache is cleared and cannot be used.
  • The background cleanup scheduler is stopped.
Source code in fast_cache/backends/memory.py
def close(self) -> None:
    """
    Closes the backend and stops the background cleanup scheduler.

    This method should be called when the backend is no longer needed to ensure
    all resources are released and background jobs are stopped.

    Notes:
        - After calling this method, the cache is cleared and cannot be used.
        - The background cleanup scheduler is stopped.
    """
    self._stop_cleanup_scheduler()
    self._cache = None

fast_cache.RedisBackend

RedisBackend(redis_url, namespace='fastapi-cache', pool_size=10, max_connections=20)

Bases: CacheBackend

Redis cache backend implementation with namespace support.

Attributes:

Name Type Description
_namespace str

Namespace prefix for all keys.

_sync_pool ConnectionPool

Synchronous Redis connection pool.

_async_pool ConnectionPool

Asynchronous Redis connection pool.

_sync_client Redis

Synchronous Redis client.

_async_client Redis

Asynchronous Redis client.

Initialize Redis backend with connection URL and pool settings.

Parameters:

Name Type Description Default
redis_url str

Redis connection URL (e.g., "redis://localhost:6379/0").

required
namespace str

Namespace prefix for all keys (default: "fastapi-cache").

'fastapi-cache'
pool_size int

Minimum number of connections in the pool.

10
max_connections int

Maximum number of connections in the pool.

20
Source code in fast_cache/backends/redis.py
def __init__(
    self,
    redis_url: str,
    namespace: str = "fastapi-cache",
    pool_size: int = 10,
    max_connections: int = 20,
) -> None:
    """
    Initialize Redis backend with connection URL and pool settings.

    Args:
        redis_url (str): Redis connection URL (e.g., "redis://localhost:6379/0").
        namespace (str): Namespace prefix for all keys (default: "fastapi-cache").
        pool_size (int): Minimum number of connections in the pool.
        max_connections (int): Maximum number of connections in the pool.
    """

    try:
        import redis.asyncio as aioredis
        import redis
    except ImportError:
        raise ImportError(
            "RedisBackend requires the 'redis' package. "
            "Install it with: pip install fast-cache[redis]"
        )

    self._namespace = namespace
    self._sync_pool = redis.ConnectionPool.from_url(
        redis_url, max_connections=max_connections, decode_responses=False
    )

    self._async_pool = aioredis.ConnectionPool.from_url(
        redis_url,
        max_connections=max_connections,
        decode_responses=False,
        encoding="utf-8",
    )

    self._sync_client = redis.Redis(connection_pool=self._sync_pool)
    self._async_client = aioredis.Redis(connection_pool=self._async_pool)

aget async

aget(key)

Asynchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached value, or None if not found.

Source code in fast_cache/backends/redis.py
async def aget(self, key: str) -> Optional[Any]:
    """
    Asynchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.

    Returns:
        Optional[Any]: The cached value, or None if not found.
    """
    try:
        result = await self._async_client.get(self._make_key(key))
        return pickle.loads(result) if result else None
    except Exception:
        return None

get

get(key)

Synchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached value, or None if not found.

Source code in fast_cache/backends/redis.py
def get(self, key: str) -> Optional[Any]:
    """
    Synchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.

    Returns:
        Optional[Any]: The cached value, or None if not found.
    """
    try:
        result = self._sync_client.get(self._make_key(key))
        return pickle.loads(result) if result else None
    except Exception:
        return None

aset async

aset(key, value, expire=None)

Asynchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/redis.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    try:
        ex = expire.total_seconds() if isinstance(expire, timedelta) else expire
        await self._async_client.set(
            self._make_key(key), pickle.dumps(value), ex=ex
        )
    except Exception:
        pass

set

set(key, value, expire=None)

Synchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/redis.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    try:
        ex = expire.total_seconds() if isinstance(expire, timedelta) else expire
        self._sync_client.set(self._make_key(key), pickle.dumps(value), ex=ex)
    except Exception:
        pass

adelete async

adelete(key)

Asynchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/redis.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    try:
        await self._async_client.delete(self._make_key(key))
    except Exception:
        pass

delete

delete(key)

Synchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/redis.py
def delete(self, key: str) -> None:
    """
    Synchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    try:
        self._sync_client.delete(self._make_key(key))
    except Exception:
        pass

aclear async

aclear()

Asynchronously clear all values from the namespace.

Source code in fast_cache/backends/redis.py
async def aclear(self) -> None:
    """
    Asynchronously clear all values from the namespace.
    """
    try:
        keys = await self._scan_keys()
        if keys:
            await self._async_client.delete(*keys)
    except Exception:
        pass

clear

clear()

Synchronously clear all values from the namespace.

Source code in fast_cache/backends/redis.py
def clear(self) -> None:
    """
    Synchronously clear all values from the namespace.
    """
    try:
        cursor = 0
        namespace_pattern = self._make_key("*")

        while True:
            cursor, keys = self._sync_client.scan(
                cursor=cursor, match=namespace_pattern, count=100
            )
            if keys:
                self._sync_client.delete(*keys)
            if cursor == 0:
                break
    except Exception:
        pass

ahas async

ahas(key)

Asynchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/redis.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    try:
        return await self._async_client.exists(self._make_key(key)) > 0
    except Exception:
        return False

has

has(key)

Synchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/redis.py
def has(self, key: str) -> bool:
    """
    Synchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    try:
        return self._sync_client.exists(self._make_key(key)) > 0
    except Exception:
        return False

close async

close()

Close Redis connections and clean up pools.

Source code in fast_cache/backends/redis.py
async def close(self) -> None:
    """
    Close Redis connections and clean up pools.
    """
    await self._async_client.close()
    await self._async_pool.disconnect()
    self._sync_client.close()
    self._sync_pool.disconnect()

fast_cache.PostgresBackend

PostgresBackend(dsn, namespace='fastapi', min_size=1, max_size=10, cleanup_interval=30, auto_cleanup=True)

Bases: CacheBackend

PostgreSQL cache backend implementation.

Uses an UNLOGGED TABLE for performance and lazy expiration.

Initializes a new instance of the PostgresBackend cache.

This backend uses a PostgreSQL database to store cache entries in an UNLOGGED TABLE for improved performance. It supports both synchronous and asynchronous operations, lazy expiration, and periodic cleanup of expired entries.

Parameters:

Name Type Description Default
dsn str

The PostgreSQL DSN (Data Source Name) string used to connect to the database.

required
namespace str

A namespace prefix for all cache keys. This allows multiple independent caches to share the same database table. Only alphanumeric characters and underscores are allowed. Defaults to "fastapi".

'fastapi'
min_size int

The minimum number of connections to maintain in the connection pool. Defaults to 1.

1
max_size int

The maximum number of connections allowed in the connection pool. Defaults to 10.

10
cleanup_interval int

The interval, in seconds, at which the background cleanup job runs to remove expired cache entries. Defaults to 30 seconds.

30
auto_cleanup bool

If True, automatically starts the background cleanup scheduler on initialization. Defaults to True.

True

Raises:

Type Description
ImportError

If the required psycopg[pool] package is not installed.

ValueError

If the provided namespace contains invalid characters.

Notes
  • The backend creates the cache table and an index on the expiration column if they do not already exist.
  • The cleanup scheduler can be started or stopped manually.
  • Both synchronous and asynchronous connection pools are managed.
Source code in fast_cache/backends/postgres.py
def __init__(
    self,
    dsn: str,
    namespace: str = "fastapi",
    min_size: int = 1,
    max_size: int = 10,
    cleanup_interval: int = 30,
    auto_cleanup: bool = True,
) -> None:
    """
    Initializes a new instance of the PostgresBackend cache.

    This backend uses a PostgreSQL database to store cache entries in an
    UNLOGGED TABLE for improved performance. It supports both synchronous and
    asynchronous operations, lazy expiration, and periodic cleanup of expired
    entries.

    Args:
        dsn (str): The PostgreSQL DSN (Data Source Name) string used to connect
            to the database.
        namespace (str, optional): A namespace prefix for all cache keys. This
            allows multiple independent caches to share the same database table.
            Only alphanumeric characters and underscores are allowed. Defaults to "fastapi".
        min_size (int, optional): The minimum number of connections to maintain
            in the connection pool. Defaults to 1.
        max_size (int, optional): The maximum number of connections allowed in
            the connection pool. Defaults to 10.
        cleanup_interval (int, optional): The interval, in seconds, at which the
            background cleanup job runs to remove expired cache entries.
            Defaults to 30 seconds.
        auto_cleanup (bool, optional): If True, automatically starts the
            background cleanup scheduler on initialization. Defaults to True.

    Raises:
        ImportError: If the required `psycopg[pool]` package is not installed.
        ValueError: If the provided namespace contains invalid characters.

    Notes:
        - The backend creates the cache table and an index on the expiration
          column if they do not already exist.
        - The cleanup scheduler can be started or stopped manually.
        - Both synchronous and asynchronous connection pools are managed.
    """
    try:
        from psycopg_pool import AsyncConnectionPool, ConnectionPool
    except ImportError:
        raise ImportError(
            "PostgresBackend requires the 'psycopg[pool]' package. "
            "Install it with: pip install fast-cache[postgres]"
        )

    self._namespace = _validate_namespace(namespace)
    self._table_name = f"{namespace}_cache_store"

    # The pools are opened on creation and will auto-reopen if needed
    # when using the context manager (`with/async with`).
    self._sync_pool = ConnectionPool(
        conninfo=dsn, min_size=min_size, max_size=max_size, open=True
    )
    self._async_pool = AsyncConnectionPool(
        conninfo=dsn, min_size=min_size, max_size=max_size, open=False
    )
    self._create_unlogged_table_if_not_exists()

    self._cleanup_interval = cleanup_interval
    self._auto_cleanup = auto_cleanup

    self._scheduler = None
    self._scheduler_lock = threading.Lock()

    if self._auto_cleanup:
        self._start_cleanup_scheduler()

set

set(key, value, expire=None)

Stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object/values to cache. It will be serialized using pickle.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • The key is automatically namespaced.
  • Expired entries are lazily deleted on access or by the cleanup job.
Source code in fast_cache/backends/postgres.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object/values to cache. It will be serialized using pickle.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - The key is automatically namespaced.
        - Expired entries are lazily deleted on access or by the cleanup job.
    """
    expire_at = self._compute_expire_at(expire)
    with self._sync_pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                f"""
                INSERT INTO {self._table_name} (key, value, expire_at)
                VALUES (%s, %s, %s)
                ON CONFLICT (key)
                DO UPDATE SET value = EXCLUDED.value,
                              expire_at = EXCLUDED.expire_at;
                """,
                (self._make_key(key), pickle.dumps(value), expire_at),
            )
            conn.commit()

get

get(key)

Retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns None. If the entry is expired, it is deleted from the cache (lazy deletion).

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached Python object, or None if not found or expired.

Notes
  • The value is deserialized using pickle.
  • Expired entries are removed on access.
Source code in fast_cache/backends/postgres.py
def get(self, key: str) -> Optional[Any]:
    """
    Retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns None. If the
    entry is expired, it is deleted from the cache (lazy deletion).

    Args:
        key (str): The cache key to retrieve.

    Returns:
        Optional[Any]: The cached Python object, or None if not found or expired.

    Notes:
        - The value is deserialized using pickle.
        - Expired entries are removed on access.
    """
    with self._sync_pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                f"SELECT value, expire_at FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            row = cur.fetchone()
            if not row:
                return None
            value, expire_at = row
            if self._is_expired(expire_at):
                self.delete(key)  # Lazy delete
                return None
            return pickle.loads(value)

delete

delete(key)

Deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • The key is automatically namespaced.
Source code in fast_cache/backends/postgres.py
def delete(self, key: str) -> None:
    """
    Deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - The key is automatically namespaced.
    """
    with self._sync_pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                f"DELETE FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            conn.commit()

has

has(key)

Checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Expired entries are not considered present.
  • Does not remove expired entries; use get for lazy deletion.
Source code in fast_cache/backends/postgres.py
def has(self, key: str) -> bool:
    """
    Checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Expired entries are not considered present.
        - Does not remove expired entries; use `get` for lazy deletion.
    """
    with self._sync_pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                f"SELECT expire_at FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            row = cur.fetchone()
            if not row:
                return False
            return not self._is_expired(row[0])

clear

clear()

Removes all cache entries in the current namespace.

This method deletes all rows from the cache table whose keys match the current namespace prefix.

Notes
  • Only entries in the current namespace are affected.
  • This operation can be expensive if the cache is large.
Source code in fast_cache/backends/postgres.py
def clear(self) -> None:
    """
    Removes all cache entries in the current namespace.

    This method deletes all rows from the cache table whose keys match the
    current namespace prefix.

    Notes:
        - Only entries in the current namespace are affected.
        - This operation can be expensive if the cache is large.
    """

    with self._sync_pool.connection() as conn:
        with conn.cursor() as cur:
            # FIX: Use the dynamic table name
            cur.execute(
                f"DELETE FROM {self._table_name} WHERE key LIKE %s;",
                (self._make_key("%"),),
            )
            conn.commit()

aset async

aset(key, value, expire=None)

Asynchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object/values to cache. It will be serialized using pickle.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • Uses the asynchronous connection pool.
  • The key is automatically namespaced.
Source code in fast_cache/backends/postgres.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object/values to cache. It will be serialized using pickle.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - Uses the asynchronous connection pool.
        - The key is automatically namespaced.
    """
    await self._ensure_async_pool_open()
    expire_at = self._compute_expire_at(expire)
    async with self._async_pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                f"""
                INSERT INTO {self._table_name} (key, value, expire_at)
                VALUES (%s, %s, %s)
                ON CONFLICT (key)
                DO UPDATE SET value = EXCLUDED.value,
                              expire_at = EXCLUDED.expire_at;
                """,
                (self._make_key(key), pickle.dumps(value), expire_at),
            )
            await conn.commit()

aget async

aget(key)

Asynchronously retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns None. If the entry is expired, it is deleted from the cache (lazy deletion).

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached Python object, or None if not found or expired.

Notes
  • Uses the asynchronous connection pool.
  • The value is deserialized using pickle.
  • Expired entries are removed on access.
Source code in fast_cache/backends/postgres.py
async def aget(self, key: str) -> Optional[Any]:
    """
    Asynchronously retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns None. If the
    entry is expired, it is deleted from the cache (lazy deletion).

    Args:
        key (str): The cache key to retrieve.

    Returns:
        Optional[Any]: The cached Python object, or None if not found or expired.

    Notes:
        - Uses the asynchronous connection pool.
        - The value is deserialized using pickle.
        - Expired entries are removed on access.
    """
    await self._ensure_async_pool_open()
    async with self._async_pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                f"SELECT value, expire_at FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            row = await cur.fetchone()
            if not row:
                return None
            value, expire_at = row
            if self._is_expired(expire_at):
                await self.adelete(key)  # Lazy delete
                return None
            return pickle.loads(value)

adelete async

adelete(key)

Asynchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Uses the asynchronous connection pool.
  • The key is automatically namespaced.
Source code in fast_cache/backends/postgres.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Uses the asynchronous connection pool.
        - The key is automatically namespaced.
    """
    await self._ensure_async_pool_open()
    async with self._async_pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                f"DELETE FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            await conn.commit()

ahas async

ahas(key)

Asynchronously checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Uses the asynchronous connection pool.
  • Expired entries are not considered present.
  • Does not remove expired entries; use aget for lazy deletion.
Source code in fast_cache/backends/postgres.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Uses the asynchronous connection pool.
        - Expired entries are not considered present.
        - Does not remove expired entries; use `aget` for lazy deletion.
    """
    await self._ensure_async_pool_open()
    async with self._async_pool.connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                f"SELECT expire_at FROM {self._table_name} WHERE key = %s;",
                (self._make_key(key),),
            )
            row = await cur.fetchone()
            if not row:
                return False
            return not self._is_expired(row[0])

aclear async

aclear()

Asynchronously removes all cache entries in the current namespace.

This method deletes all rows from the cache table whose keys match the current namespace prefix.

Notes
  • Uses the asynchronous connection pool.
  • Only entries in the current namespace are affected.
  • This operation can be expensive if the cache is large.
Source code in fast_cache/backends/postgres.py
async def aclear(self) -> None:
    """
    Asynchronously removes all cache entries in the current namespace.

    This method deletes all rows from the cache table whose keys match the
    current namespace prefix.

    Notes:
        - Uses the asynchronous connection pool.
        - Only entries in the current namespace are affected.
        - This operation can be expensive if the cache is large.
    """
    await self._ensure_async_pool_open()
    async with self._async_pool.connection() as conn:
        async with conn.cursor() as cur:
            # FIX: Use the dynamic table name
            await cur.execute(
                f"DELETE FROM {self._table_name} WHERE key LIKE %s;",
                (self._make_key("%"),),
            )
            await conn.commit()

aclose async

aclose()

Asynchronously closes the connection pools and stops the cleanup scheduler.

This method should be called when the backend is no longer needed to ensure all resources are released and background jobs are stopped.

Notes
  • Closes both synchronous and asynchronous connection pools.
  • Stops the background cleanup scheduler.
Source code in fast_cache/backends/postgres.py
async def aclose(self) -> None:
    """
    Asynchronously closes the connection pools and stops the cleanup scheduler.

    This method should be called when the backend is no longer needed to ensure
    all resources are released and background jobs are stopped.

    Notes:
        - Closes both synchronous and asynchronous connection pools.
        - Stops the background cleanup scheduler.
    """
    self._stop_cleanup_scheduler()
    if self._sync_pool:
        self._sync_pool.close()

    if self._async_pool:
        await self._async_pool.close()

close

close()

Closes the synchronous connection pool and stops the cleanup scheduler.

This method should be called when the backend is no longer needed to ensure all resources are released and background jobs are stopped.

Notes
  • Only closes the synchronous connection pool.
  • Stops the background cleanup scheduler.
Source code in fast_cache/backends/postgres.py
def close(self) -> None:
    """
    Closes the synchronous connection pool and stops the cleanup scheduler.

    This method should be called when the backend is no longer needed to ensure
    all resources are released and background jobs are stopped.

    Notes:
        - Only closes the synchronous connection pool.
        - Stops the background cleanup scheduler.
    """
    self._stop_cleanup_scheduler()
    if self._sync_pool:
        self._sync_pool.close()

fast_cache.MemcachedBackend

MemcachedBackend(host, port, *, pool_size=10, pool_minsize=1, namespace='fastapi_cache')

Bases: CacheBackend

Initializes a new instance of the MemcachedBackend cache.

This backend provides a cache using Memcached as the storage layer. It supports both synchronous and asynchronous operations, and uses a namespace prefix for all keys to avoid collisions.

Parameters:

Name Type Description Default
host str

The hostname or IP address of the Memcached server.

required
port int

The port number of the Memcached server.

required
pool_size int

The maximum number of connections in the async pool. Defaults to 10.

10
pool_minsize int

The minimum number of connections in the async pool. Defaults to 1.

1
namespace str

Prefix for all cache keys. Defaults to "fastapi_cache".

'fastapi_cache'

Raises:

Type Description
ImportError

If the required aiomcache or pymemcache packages are not installed.

Notes
  • Both synchronous and asynchronous Memcached clients are initialized.
  • The async client is created per event loop.
  • All cache keys are automatically namespaced.
Source code in fast_cache/backends/memcached.py
def __init__(
    self,
    host: str,
    port: int,
    *,
    pool_size: int = 10,
    pool_minsize: int = 1,
    namespace: str = "fastapi_cache",
) -> None:
    try:
        import aiomcache
        from pymemcache.client.base import PooledClient
    except ImportError:
        raise ImportError(
            "MemcachedBackend requires 'aiomcache' and 'pymemcache'. "
            "Install with: pip install fast-cache[memcached]"
        )
    self._namespace = namespace
    self._host = host
    self._port = port

    # Sync client
    self._sync_client = PooledClient(
        (host, port),
        max_pool_size=10,
    )
    self._async_client = aiomcache.Client(
        host,
        port,
        pool_size=pool_size,
        pool_minsize=pool_minsize,
    )

get

get(key)

Synchronously retrieves a value from the cache by key.

If the key does not exist, returns None.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached Python object, or None if not found.

Notes
  • The value is deserialized using pickle.
  • Handles deserialization errors gracefully.
  • Thread-safe for Memcached client.
Source code in fast_cache/backends/memcached.py
def get(self, key: str) -> Optional[Any]:
    """
    Synchronously retrieves a value from the cache by key.

    If the key does not exist, returns None.

    Args:
        key (str): The cache key to retrieve.

    Returns:
        Optional[Any]: The cached Python object, or None if not found.

    Notes:
        - The value is deserialized using pickle.
        - Handles deserialization errors gracefully.
        - Thread-safe for Memcached client.
    """
    try:
        value = self._sync_client.get(self._make_key(key))
        return pickle.loads(value) if value else None
    except Exception:
        return None

set

set(key, value, expire=None)

Synchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • The value is serialized using pickle.
  • Thread-safe for Memcached client.
  • Expiration is handled by Memcached.
Source code in fast_cache/backends/memcached.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - The value is serialized using pickle.
        - Thread-safe for Memcached client.
        - Expiration is handled by Memcached.
    """
    try:
        exptime = (
            int(expire.total_seconds())
            if isinstance(expire, timedelta)
            else (expire or 0)
        )
        self._sync_client.set(
            self._make_key(key), pickle.dumps(value), expire=exptime
        )
    except Exception:
        pass

delete

delete(key)

Synchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Thread-safe for Memcached client.
  • The key is automatically namespaced.
Source code in fast_cache/backends/memcached.py
def delete(self, key: str) -> None:
    """
    Synchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Thread-safe for Memcached client.
        - The key is automatically namespaced.
    """
    try:
        self._sync_client.delete(self._make_key(key))
    except Exception:
        pass

clear

clear()

Synchronously removes all cache entries from Memcached.

Memcached does not support namespace-based clearing, so this operation flushes the entire cache, removing all entries regardless of namespace.

Notes
  • Thread-safe for Memcached client.
  • This operation affects all keys in the Memcached instance.
  • Use with caution in shared environments.
Source code in fast_cache/backends/memcached.py
def clear(self) -> None:
    """
    Synchronously removes all cache entries from Memcached.

    Memcached does not support namespace-based clearing, so this operation flushes
    the entire cache, removing all entries regardless of namespace.

    Notes:
        - Thread-safe for Memcached client.
        - This operation affects all keys in the Memcached instance.
        - Use with caution in shared environments.
    """

    try:
        self._sync_client.flush_all()
    except Exception:
        pass

has

has(key)

Synchronously checks if a cache key exists.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Notes
  • Thread-safe for Memcached client.
  • Expired entries are not considered present.
Source code in fast_cache/backends/memcached.py
def has(self, key: str) -> bool:
    """
    Synchronously checks if a cache key exists.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists, False otherwise.

    Notes:
        - Thread-safe for Memcached client.
        - Expired entries are not considered present.
    """
    try:
        return self._sync_client.get(self._make_key(key)) is not None
    except Exception:
        return False

aget async

aget(key)

Asynchronously retrieves a value from the cache by key.

If the key does not exist, returns None.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached Python object, or None if not found.

Notes
  • The value is deserialized using pickle.
  • Handles deserialization errors gracefully.
  • Asyncio-safe for Memcached client.
Source code in fast_cache/backends/memcached.py
async def aget(self, key: str) -> Optional[Any]:
    """
    Asynchronously retrieves a value from the cache by key.

    If the key does not exist, returns None.

    Args:
        key (str): The cache key to retrieve.

    Returns:
        Optional[Any]: The cached Python object, or None if not found.

    Notes:
        - The value is deserialized using pickle.
        - Handles deserialization errors gracefully.
        - Asyncio-safe for Memcached client.
    """
    try:
        value = await self._async_client.get(self._make_key(key))
        return pickle.loads(value) if value else None
    except Exception:
        return None

aset async

aset(key, value, expire=None)

Asynchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • The value is serialized using pickle.
  • Asyncio-safe for Memcached client.
  • Expiration is handled by Memcached.
Source code in fast_cache/backends/memcached.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - The value is serialized using pickle.
        - Asyncio-safe for Memcached client.
        - Expiration is handled by Memcached.
    """
    try:
        exptime = (
            int(expire.total_seconds())
            if isinstance(expire, timedelta)
            else (expire or 0)
        )
        await self._async_client.set(
            self._make_key(key), pickle.dumps(value), exptime=exptime
        )
    except Exception:
        pass

adelete async

adelete(key)

Asynchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Asyncio-safe for Memcached client.
  • The key is automatically namespaced.
Source code in fast_cache/backends/memcached.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Asyncio-safe for Memcached client.
        - The key is automatically namespaced.
    """
    try:
        await self._async_client.delete(self._make_key(key))
    except Exception:
        pass

aclear async

aclear()

Asynchronously removes all cache entries from Memcached.

Memcached does not support namespace-based clearing, so this operation flushes the entire cache, removing all entries regardless of namespace.

Notes
  • Asyncio-safe for Memcached client.
  • This operation affects all keys in the Memcached instance.
  • Use with caution in shared environments.
Source code in fast_cache/backends/memcached.py
async def aclear(self) -> None:
    """
    Asynchronously removes all cache entries from Memcached.

    Memcached does not support namespace-based clearing, so this operation flushes
    the entire cache, removing all entries regardless of namespace.

    Notes:
        - Asyncio-safe for Memcached client.
        - This operation affects all keys in the Memcached instance.
        - Use with caution in shared environments.
    """
    try:
        await self._async_client.flush_all()
    except Exception:
        pass

ahas async

ahas(key)

Asynchronously checks if a cache key exists.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Notes
  • Asyncio-safe for Memcached client.
  • Expired entries are not considered present.
Source code in fast_cache/backends/memcached.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously checks if a cache key exists.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists, False otherwise.

    Notes:
        - Asyncio-safe for Memcached client.
        - Expired entries are not considered present.
    """
    try:
        value = await self._async_client.get(self._make_key(key))
        return value is not None
    except Exception:
        return False

close async

close()

Asynchronously closes both the async and sync Memcached clients.

This method should be called when the backend is no longer needed to ensure all resources are released.

Notes
  • After calling this method, the backend cannot be used.
  • Closes both the async and sync clients.
Source code in fast_cache/backends/memcached.py
async def close(self) -> None:
    """
    Asynchronously closes both the async and sync Memcached clients.

    This method should be called when the backend is no longer needed to ensure
    all resources are released.

    Notes:
        - After calling this method, the backend cannot be used.
        - Closes both the async and sync clients.
    """
    try:
        await self._async_client.close()
        self._sync_client.close()
    except Exception:
        pass

fast_cache.MongoDBBackend

MongoDBBackend(uri, namespace='fastapi_cache')

Bases: CacheBackend

MongoDB cache backend with both sync and async support. Uses a TTL index for automatic expiration of cache entries.

Each cache entry is stored as a document with
  • _id: the cache key (optionally namespaced)
  • value: the pickled cached value
  • expires_at: epoch time when the entry should expire

Expired documents are deleted automatically by MongoDB's TTL monitor, but expiration is also checked in code to avoid returning stale data.

Initialize the MongoDB backend.

Parameters:

Name Type Description Default
uri str

MongoDB connection URI (should include the database name).

required
namespace Optional[str]

Optional prefix for all cache keys and the collection name. Defaults to "fastapi_cache".

'fastapi_cache'

Raises: ImportError: If pymongo is not installed.

Source code in fast_cache/backends/mongodb.py
def __init__(self, uri: str, namespace: Optional[str] = "fastapi_cache") -> None:
    """
    Initialize the MongoDB backend.

    Args:
        uri (str): MongoDB connection URI (should include the database name).
        namespace (Optional[str]): Optional prefix for all cache keys and the collection name.
                                   Defaults to "fastapi_cache".
    Raises:
        ImportError: If pymongo is not installed.
    """
    try:
        import pymongo
    except ImportError:
        raise ImportError(
            "MongoDBBackend requires 'pymongo>=4.6.0'. "
            "Install with: pip install fastapi-cachekit[mongodb]"
        )
    self._namespace = namespace or "cache"

    self._sync_client = pymongo.MongoClient(uri)
    self._sync_db = self._sync_client.get_default_database()
    self._sync_collection = self._sync_db[self._namespace]
    self._sync_collection.create_index("expires_at", expireAfterSeconds=0)

    # Async client
    self._async_client = pymongo.AsyncMongoClient(uri)
    self._async_db = self._async_client.get_default_database()
    self._async_collection = self._async_db[self._namespace]

get

get(key)

Synchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached value, or None if not found or expired.

Source code in fast_cache/backends/mongodb.py
def get(self, key: str) -> Optional[Any]:
    """
    Synchronously retrieve a value from the cache.

    Args:
        key (str): The cache key.

    Returns:
        Optional[Any]: The cached value, or None if not found or expired.
    """
    doc = self._sync_collection.find_one({"_id": self._make_key(key)})
    if doc and (doc.get("expires_at", float("inf")) > time.time()):
        try:
            return pickle.loads(doc["value"])
        except Exception:
            return None
    return None

set

set(key, value, expire=None)

Synchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required
value Any

The value to cache.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta. If None, the entry never expires.

None
Source code in fast_cache/backends/mongodb.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously set a value in the cache.

    Args:
        key (str): The cache key.
        value (Any): The value to cache.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
                                                 If None, the entry never expires.
    """
    update = {"value": pickle.dumps(value)}
    exptime = self._compute_expire_at(expire)
    if exptime is not None:
        update["expires_at"] = exptime

    self._sync_collection.update_one(
        {"_id": self._make_key(key)}, {"$set": update}, upsert=True
    )

delete

delete(key)

Synchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required
Source code in fast_cache/backends/mongodb.py
def delete(self, key: str) -> None:
    """
    Synchronously delete a value from the cache.

    Args:
        key (str): The cache key.
    """
    self._sync_collection.delete_one({"_id": self._make_key(key)})

clear

clear()

Synchronously clear all values from the namespace.

Source code in fast_cache/backends/mongodb.py
def clear(self) -> None:
    """
    Synchronously clear all values from the namespace.
    """
    self._sync_collection.delete_many({"_id": {"$regex": f"^{self._namespace}:"}})

has

has(key)

Synchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Source code in fast_cache/backends/mongodb.py
def has(self, key: str) -> bool:
    """
    Synchronously check if a key exists in the cache.

    Args:
        key (str): The cache key.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.
    """
    doc = self._sync_collection.find_one({"_id": self._make_key(key)})
    return bool(doc and (doc.get("expires_at", float("inf")) > time.time()))

aget async

aget(key)

Asynchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached value, or None if not found or expired.

Source code in fast_cache/backends/mongodb.py
async def aget(self, key: str) -> Optional[Any]:
    """
    Asynchronously retrieve a value from the cache.

    Args:
        key (str): The cache key.

    Returns:
        Optional[Any]: The cached value, or None if not found or expired.
    """
    doc = await self._async_collection.find_one({"_id": self._make_key(key)})
    if doc and (doc.get("expires_at", float("inf")) > time.time()):
        try:
            return pickle.loads(doc["value"])
        except Exception:
            return None
    return None

aset async

aset(key, value, expire=None)

Asynchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required
value Any

The value to cache.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta. If None, the entry never expires.

None
Source code in fast_cache/backends/mongodb.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously set a value in the cache.

    Args:
        key (str): The cache key.
        value (Any): The value to cache.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
                                                 If None, the entry never expires.
    """
    update = {"value": pickle.dumps(value)}
    exptime = self._compute_expire_at(expire)
    if exptime is not None:
        update["expires_at"] = exptime

    await self._async_collection.update_one(
        {"_id": self._make_key(key)}, {"$set": update}, upsert=True
    )

adelete async

adelete(key)

Asynchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required
Source code in fast_cache/backends/mongodb.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously delete a value from the cache.

    Args:
        key (str): The cache key.
    """
    await self._async_collection.delete_one({"_id": self._make_key(key)})

aclear async

aclear()

Asynchronously clear all values from the namespace.

Source code in fast_cache/backends/mongodb.py
async def aclear(self) -> None:
    """
    Asynchronously clear all values from the namespace.
    """
    await self._async_collection.delete_many(
        {"_id": {"$regex": f"^{self._namespace}:"}}
    )

ahas async

ahas(key)

Asynchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The cache key.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Source code in fast_cache/backends/mongodb.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously check if a key exists in the cache.

    Args:
        key (str): The cache key.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.
    """
    doc = await self._async_collection.find_one({"_id": self._make_key(key)})
    return bool(doc and (doc.get("expires_at", float("inf")) > time.time()))

close

close()

Close the synchronous MongoDB client.

Source code in fast_cache/backends/mongodb.py
def close(self) -> None:
    """
    Close the synchronous MongoDB client.
    """
    self._sync_client.close()

aclose async

aclose()

Close the asynchronous MongoDB client.

Source code in fast_cache/backends/mongodb.py
async def aclose(self) -> None:
    """
    Close the asynchronous MongoDB client.
    """
    self._sync_client.close()
    await self._async_client.close()

fast_cache.FirestoreBackend

FirestoreBackend(credential_path=None, namespace='fastapi_cache', collection_name='cache_entries', cleanup_interval=30, auto_cleanup=True)

Bases: CacheBackend

Initializes a new instance of the FirestoreBackend cache.

This backend provides a cache using Google Cloud Firestore as the storage layer. It supports both synchronous and asynchronous operations, manual expiration management, and optional periodic cleanup of expired entries.

Parameters:

Name Type Description Default
credential_path Optional[str]

Path to the Firebase Admin SDK credentials file. If None, uses the GOOGLE_APPLICATION_CREDENTIALS environment variable. Defaults to None.

None
namespace Optional[str]

Optional prefix for all cache keys. Defaults to "fastapi_cache".

'fastapi_cache'
collection_name Optional[str]

Name of the Firestore collection to use for storing cache entries. Defaults to "cache_entries".

'cache_entries'
cleanup_interval int

Interval in seconds for periodic cleanup of expired entries. Defaults to 30.

30
auto_cleanup bool

Whether to automatically start the cleanup scheduler on initialization. Defaults to True.

True

Raises:

Type Description
ImportError

If the required google-cloud-firestore package is not installed.

Notes
  • The backend uses a hashed, namespaced key for each Firestore document.
  • Expired entries are managed via a custom expires_at field.
  • Both synchronous and asynchronous Firestore clients are initialized.
  • The cleanup scheduler can be started or stopped manually.
Source code in fast_cache/backends/google_firestore.py
def __init__(
    self,
    credential_path: Optional[str] = None,
    namespace: Optional[str] = "fastapi_cache",
    collection_name: Optional[str] = "cache_entries",
    cleanup_interval: int = 30,
    auto_cleanup: bool = True,
) -> None:
    try:
        from google.oauth2 import service_account
        from google.cloud import firestore
        from google.cloud.firestore_v1.async_client import AsyncClient
        from google.cloud.firestore_v1.client import Client
    except ImportError:
        raise ImportError(
            "FirestoreBackend requires 'google-cloud-firestore'. "
            "Install with: pip install fastapi-cachekit[firestore]"
        )

    self._namespace = namespace or "cache"
    self._collection_name = collection_name or "cache_entries"

    self._cleanup_task = None
    self._cleanup_interval = cleanup_interval
    self._auto_cleanup = auto_cleanup

    self._scheduler = None
    self._scheduler_lock = threading.Lock()

    if credential_path:
        # Explicitly load credentials from the provided path
        credentials = service_account.Credentials.from_service_account_file(
            credential_path
        )
        self._sync_db: Client = firestore.Client(credentials=credentials)
        self._async_db: AsyncClient = firestore.AsyncClient(credentials=credentials)
    else:
        # Rely on GOOGLE_APPLICATION_CREDENTIALS
        self._sync_db: Client = firestore.Client()
        self._async_db: AsyncClient = firestore.AsyncClient()

    if self._auto_cleanup:
        self._start_cleanup_scheduler()

get

get(key)

Synchronously retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns None. If the entry is expired, it is not automatically deleted.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached Python object, or None if not found or expired.

Notes
  • The value is deserialized using pickle.
  • Handles deserialization errors gracefully.
  • Thread-safe for Firestore client.
Source code in fast_cache/backends/google_firestore.py
def get(self, key: str) -> Optional[Any]:
    """
    Synchronously retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns None. If the
    entry is expired, it is not automatically deleted.

    Args:
        key (str): The cache key to retrieve.

    Returns:
        Optional[Any]: The cached Python object, or None if not found or expired.

    Notes:
        - The value is deserialized using pickle.
        - Handles deserialization errors gracefully.
        - Thread-safe for Firestore client.
    """
    doc_ref = self._sync_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    doc = doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        if not self._is_expired(data.get("expires_at")):
            try:
                return pickle.loads(data["value"])
            except (pickle.UnpicklingError, KeyError):
                return None
    return None

set

set(key, value, expire=None)

Synchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • The value is serialized using pickle.
  • Thread-safe for Firestore client.
Source code in fast_cache/backends/google_firestore.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - The value is serialized using pickle.
        - Thread-safe for Firestore client.
    """
    doc_ref = self._sync_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    data = {"value": pickle.dumps(value)}
    exptime = self._compute_expire_at(expire)
    if exptime is not None:
        data["expires_at"] = exptime

    doc_ref.set(data)

delete

delete(key)

Synchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Thread-safe for Firestore client.
  • The key is automatically namespaced and hashed.
Source code in fast_cache/backends/google_firestore.py
def delete(self, key: str) -> None:
    """
    Synchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Thread-safe for Firestore client.
        - The key is automatically namespaced and hashed.
    """
    doc_ref = self._sync_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    doc_ref.delete()

clear

clear()

Synchronously removes all cache entries in the collection.

This method deletes all documents in the configured Firestore collection. Note that Firestore does not support direct namespace-based clearing, so all entries in the collection are removed.

Notes
  • Thread-safe for Firestore client.
  • This operation can be expensive if the collection is large.
  • For more granular clearing, consider adding a namespace field to documents.
Source code in fast_cache/backends/google_firestore.py
def clear(self) -> None:
    """
    Synchronously removes all cache entries in the collection.

    This method deletes all documents in the configured Firestore collection.
    Note that Firestore does not support direct namespace-based clearing, so
    all entries in the collection are removed.

    Notes:
        - Thread-safe for Firestore client.
        - This operation can be expensive if the collection is large.
        - For more granular clearing, consider adding a namespace field to documents.
    """
    docs = self._sync_db.collection(self._collection_name).stream()
    for doc in docs:
        doc.reference.delete()

has

has(key)

Synchronously checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Thread-safe for Firestore client.
  • Expired entries are not considered present.
Source code in fast_cache/backends/google_firestore.py
def has(self, key: str) -> bool:
    """
    Synchronously checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Thread-safe for Firestore client.
        - Expired entries are not considered present.
    """
    doc_ref = self._sync_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    doc = doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        return not self._is_expired(data.get("expires_at"))
    return False

aget async

aget(key)

Asynchronously retrieves a value from the cache by key.

If the key does not exist or the entry has expired, returns None. If the entry is expired, it is not automatically deleted.

Parameters:

Name Type Description Default
key str

The cache key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached Python object, or None if not found or expired.

Notes
  • The value is deserialized using pickle.
  • Handles deserialization errors gracefully.
  • Asyncio-safe for Firestore client.
Source code in fast_cache/backends/google_firestore.py
async def aget(self, key: str) -> Optional[Any]:
    """
    Asynchronously retrieves a value from the cache by key.

    If the key does not exist or the entry has expired, returns None. If the
    entry is expired, it is not automatically deleted.

    Args:
        key (str): The cache key to retrieve.

    Returns:
        Optional[Any]: The cached Python object, or None if not found or expired.

    Notes:
        - The value is deserialized using pickle.
        - Handles deserialization errors gracefully.
        - Asyncio-safe for Firestore client.
    """
    doc_ref = self._async_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    doc = await doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        if not self._is_expired(data.get("expires_at")):
            try:
                return pickle.loads(data["value"])
            except (pickle.UnpicklingError, KeyError):
                # Handle potential deserialization errors or missing value field
                return None
    return None

aset async

aset(key, value, expire=None)

Asynchronously stores a value in the cache under the specified key.

If the key already exists, its value and expiration time are updated. Optionally, an expiration time can be set, after which the entry will be considered expired and eligible for deletion.

Parameters:

Name Type Description Default
key str

The cache key to store the value under.

required
value Any

The Python object to cache.

required
expire Optional[Union[int, timedelta]]

The expiration time for the cache entry. Can be specified as an integer (seconds) or a timedelta. If None, the entry does not expire.

None
Notes
  • The value is serialized using pickle.
  • Asyncio-safe for Firestore client.
Source code in fast_cache/backends/google_firestore.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously stores a value in the cache under the specified key.

    If the key already exists, its value and expiration time are updated.
    Optionally, an expiration time can be set, after which the entry will be
    considered expired and eligible for deletion.

    Args:
        key (str): The cache key to store the value under.
        value (Any): The Python object to cache.
        expire (Optional[Union[int, timedelta]], optional): The expiration time
            for the cache entry. Can be specified as an integer (seconds) or a
            timedelta. If None, the entry does not expire.

    Notes:
        - The value is serialized using pickle.
        - Asyncio-safe for Firestore client.
    """
    doc_ref = self._async_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    data = {"value": pickle.dumps(value)}
    exptime = self._compute_expire_at(expire)

    if expire is not None:
        data["expires_at"] = exptime

    await doc_ref.set(data)

adelete async

adelete(key)

Asynchronously deletes a cache entry by key.

If the key does not exist, this method does nothing.

Parameters:

Name Type Description Default
key str

The cache key to delete.

required
Notes
  • Asyncio-safe for Firestore client.
  • The key is automatically namespaced and hashed.
Source code in fast_cache/backends/google_firestore.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously deletes a cache entry by key.

    If the key does not exist, this method does nothing.

    Args:
        key (str): The cache key to delete.

    Notes:
        - Asyncio-safe for Firestore client.
        - The key is automatically namespaced and hashed.
    """
    doc_ref = self._async_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    await doc_ref.delete()

aclear async

aclear()

Asynchronously removes all cache entries in the collection.

This method deletes all documents in the configured Firestore collection. Note that Firestore does not support direct namespace-based clearing, so all entries in the collection are removed.

Notes
  • Asyncio-safe for Firestore client.
  • This operation can be expensive if the collection is large.
  • For more granular clearing, consider adding a namespace field to documents.
Source code in fast_cache/backends/google_firestore.py
async def aclear(self) -> None:
    """
    Asynchronously removes all cache entries in the collection.

    This method deletes all documents in the configured Firestore collection.
    Note that Firestore does not support direct namespace-based clearing, so
    all entries in the collection are removed.

    Notes:
        - Asyncio-safe for Firestore client.
        - This operation can be expensive if the collection is large.
        - For more granular clearing, consider adding a namespace field to documents.
    """
    docs = self._async_db.collection(self._collection_name).stream()
    async for doc in docs:
        await doc.reference.delete()

ahas async

ahas(key)

Asynchronously checks if a cache key exists and is not expired.

Parameters:

Name Type Description Default
key str

The cache key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists and is not expired, False otherwise.

Notes
  • Asyncio-safe for Firestore client.
  • Expired entries are not considered present.
Source code in fast_cache/backends/google_firestore.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously checks if a cache key exists and is not expired.

    Args:
        key (str): The cache key to check.

    Returns:
        bool: True if the key exists and is not expired, False otherwise.

    Notes:
        - Asyncio-safe for Firestore client.
        - Expired entries are not considered present.
    """
    doc_ref = self._async_db.collection(self._collection_name).document(
        self._make_key(key)
    )
    doc = await doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        return not self._is_expired(data.get("expires_at"))
    return False

close

close()

Closes the synchronous Firestore client and stops the cleanup scheduler.

This method should be called when the backend is no longer needed to ensure all resources are released and background jobs are stopped.

Notes
  • After calling this method, the synchronous client is closed and cannot be used.
  • The background cleanup scheduler is stopped.
Source code in fast_cache/backends/google_firestore.py
def close(self) -> None:
    """
    Closes the synchronous Firestore client and stops the cleanup scheduler.

    This method should be called when the backend is no longer needed to ensure
    all resources are released and background jobs are stopped.

    Notes:
        - After calling this method, the synchronous client is closed and cannot be used.
        - The background cleanup scheduler is stopped.
    """
    self._stop_cleanup_scheduler()
    try:
        self._sync_db.close()
    except TypeError:
        return

aclose async

aclose()

Closes the asynchronous Firestore client and stops the cleanup scheduler.

This method should be called when the backend is no longer needed to ensure all resources are released and background jobs are stopped.

Notes
  • After calling this method, the asynchronous client is closed and cannot be used.
  • The background cleanup scheduler is stopped.
Source code in fast_cache/backends/google_firestore.py
async def aclose(self) -> None:
    """
    Closes the asynchronous Firestore client and stops the cleanup scheduler.

    This method should be called when the backend is no longer needed to ensure
    all resources are released and background jobs are stopped.

    Notes:
        - After calling this method, the asynchronous client is closed and cannot be used.
        - The background cleanup scheduler is stopped.
    """
    self._stop_cleanup_scheduler()
    try:
        await self._async_db.close()
    except TypeError:
        return

fast_cache.DynamoDBBackend

DynamoDBBackend(table_name, region_name, namespace='cache', aws_access_key_id=None, aws_secret_access_key=None, endpoint_url=None, create_table=True)

Bases: CacheBackend

DynamoDB cache backend implementation with namespace support.

Attributes:

Name Type Description
_namespace str

Namespace prefix for all keys.

_table_name str

DynamoDB table name.

_sync_client client

Synchronous DynamoDB client.

_async_client client

Asynchronous DynamoDB client.

_sync_resource resource

Synchronous DynamoDB resource.

_async_resource resource

Asynchronous DynamoDB resource.

Initialize DynamoDB backend with table and connection settings.

Parameters:

Name Type Description Default
table_name str

DynamoDB table name for cache storage.

required
namespace str

Namespace prefix for all keys (default: "fastapi-cache").

'cache'
region_name str

AWS region name (default: "us-east-1").

required
aws_access_key_id Optional[str]

AWS access key ID.

None
aws_secret_access_key Optional[str]

AWS secret access key.

None
endpoint_url Optional[str]

Custom endpoint URL (for local DynamoDB).

None
create_table bool

Whether to create table if it doesn't exist.

True
Source code in fast_cache/backends/dynamodb.py
def __init__(
    self,
    table_name: str,
    region_name: str,
    namespace: str = "cache",
    aws_access_key_id: Optional[str] = None,
    aws_secret_access_key: Optional[str] = None,
    endpoint_url: Optional[str] = None,
    create_table: bool = True,
) -> None:
    """
    Initialize DynamoDB backend with table and connection settings.

    Args:
        table_name (str): DynamoDB table name for cache storage.
        namespace (str): Namespace prefix for all keys (default: "fastapi-cache").
        region_name (str): AWS region name (default: "us-east-1").
        aws_access_key_id (Optional[str]): AWS access key ID.
        aws_secret_access_key (Optional[str]): AWS secret access key.
        endpoint_url (Optional[str]): Custom endpoint URL (for local DynamoDB).
        create_table (bool): Whether to create table if it doesn't exist.
    """
    try:
        import boto3
        import aioboto3
    except ImportError:
        raise ImportError(
            "DynamoDBBackend requires the 'boto3' and 'aioboto3' packages. "
            "Install them with: pip install fast-cache[dynamodb]"
        )

    self._namespace = namespace
    self._table_name = table_name

    # Connection parameters
    self._connection_params = {
        "region_name": region_name,
        "endpoint_url": endpoint_url,
    }

    if aws_access_key_id and aws_secret_access_key:
        self._connection_params.update(
            {
                "aws_access_key_id": aws_access_key_id,
                "aws_secret_access_key": aws_secret_access_key,
            }
        )

    # Sync client for table management only
    self._sync_client = boto3.client("dynamodb", **self._connection_params)

    # Sync resource/table for sync cache operations
    self._sync_resource = boto3.resource("dynamodb", **self._connection_params)
    self._sync_table = self._sync_resource.Table(table_name)

    # Initialize async session
    self._async_resource = None
    self._async_table = None
    self._async_session = aioboto3.Session()

    # Create table if requested
    if create_table:
        self._ensure_table_exists()

get

get(key)

Synchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached value, or None if not found.

Source code in fast_cache/backends/dynamodb.py
def get(self, key: str) -> Optional[Any]:
    """
    Synchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.

    Returns:
        Optional[Any]: The cached value, or None if not found.
    """
    try:
        response = self._sync_table.get_item(Key={"cache_key": self._make_key(key)})

        if "Item" not in response:
            return None

        item = response["Item"]

        # Check if item has expired and delete if so
        if self._is_expired(item):
            self.delete(key)
            return None
        value = self._deserialize_value(item["value"])
        return value
    except Exception:
        return None

aget async

aget(key)

Asynchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached value, or None if not found.

Source code in fast_cache/backends/dynamodb.py
async def aget(self, key: str) -> Optional[Any]:
    """
    Asynchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.

    Returns:
        Optional[Any]: The cached value, or None if not found.
    """
    try:
        table = await self._get_async_table()
        response = await table.get_item(Key={"cache_key": self._make_key(key)})

        if "Item" not in response:
            return None

        item = response["Item"]

        # Check if item has expired and delete if so
        if self._is_expired(item):
            await self.adelete(key)
            return None

        return self._deserialize_value(item["value"])
    except Exception:
        return None

set

set(key, value, expire=None)

Synchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/dynamodb.py
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    try:
        item = self._build_item(key, value, expire)
        self._sync_table.put_item(Item=item)
    except Exception:
        pass

aset async

aset(key, value, expire=None)

Asynchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/dynamodb.py
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    try:
        table = await self._get_async_table()
        item = self._build_item(key, value, expire)
        await table.put_item(Item=item)
    except Exception:
        pass

delete

delete(key)

Synchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/dynamodb.py
def delete(self, key: str) -> None:
    """
    Synchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    try:
        self._sync_table.delete_item(Key={"cache_key": self._make_key(key)})
    except Exception:
        pass

adelete async

adelete(key)

Asynchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/dynamodb.py
async def adelete(self, key: str) -> None:
    """
    Asynchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    try:
        table = await self._get_async_table()
        await table.delete_item(Key={"cache_key": self._make_key(key)})
    except Exception:
        pass

has

has(key)

Synchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/dynamodb.py
def has(self, key: str) -> bool:
    """
    Synchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    try:
        response = self._sync_table.get_item(
            Key={"cache_key": self._make_key(key)},
            ProjectionExpression="cache_key, #ttl",
            ExpressionAttributeNames={"#ttl": "ttl"},
        )

        if "Item" not in response:
            return False

        item = response["Item"]

        # Check if item has expired and delete if so
        if self._is_expired(item):
            self.delete(key)
            return False

        return True
    except Exception:
        return False

ahas async

ahas(key)

Asynchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/dynamodb.py
async def ahas(self, key: str) -> bool:
    """
    Asynchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    try:
        table = await self._get_async_table()
        response = await table.get_item(
            Key={"cache_key": self._make_key(key)},
            ProjectionExpression="cache_key, #ttl",
            ExpressionAttributeNames={"#ttl": "ttl"},
        )

        if "Item" not in response:
            return False

        item = response["Item"]

        # Check if item has expired and delete if so
        if self._is_expired(item):
            await self.adelete(key)
            return False

        return True
    except Exception:
        return False

clear

clear()

Synchronously clear all values from the namespace.

Source code in fast_cache/backends/dynamodb.py
def clear(self) -> None:
    """
    Synchronously clear all values from the namespace.
    """
    try:
        # Scan for all items with the namespace prefix
        response = self._sync_table.scan(
            FilterExpression="begins_with(cache_key, :prefix)",
            ExpressionAttributeValues={":prefix": f"{self._namespace}:"},
            ProjectionExpression="cache_key",
        )

        # Delete items in batches
        if response.get("Items"):
            with self._sync_table.batch_writer() as batch:
                for item in response["Items"]:
                    batch.delete_item(Key={"cache_key": item["cache_key"]})

        # Handle pagination
        while "LastEvaluatedKey" in response:
            response = self._sync_table.scan(
                FilterExpression="begins_with(cache_key, :prefix)",
                ExpressionAttributeValues={":prefix": f"{self._namespace}:"},
                ProjectionExpression="cache_key",
                ExclusiveStartKey=response["LastEvaluatedKey"],
            )

            if response.get("Items"):
                with self._sync_table.batch_writer() as batch:
                    for item in response["Items"]:
                        batch.delete_item(Key={"cache_key": item["cache_key"]})

    except Exception:
        pass

aclear async

aclear()

Asynchronously clear all values from the namespace.

Source code in fast_cache/backends/dynamodb.py
async def aclear(self) -> None:
    """
    Asynchronously clear all values from the namespace.
    """
    try:
        table = await self._get_async_table()

        # Scan for all items with the namespace prefix
        response = await table.scan(
            FilterExpression="begins_with(cache_key, :prefix)",
            ExpressionAttributeValues={":prefix": f"{self._namespace}:"},
            ProjectionExpression="cache_key",
        )

        # Delete items in batches
        if response.get("Items"):
            async with table.batch_writer() as batch:
                for item in response["Items"]:
                    await batch.delete_item(Key={"cache_key": item["cache_key"]})

        # Handle pagination
        while "LastEvaluatedKey" in response:
            response = await table.scan(
                FilterExpression="begins_with(cache_key, :prefix)",
                ExpressionAttributeValues={":prefix": f"{self._namespace}:"},
                ProjectionExpression="cache_key",
                ExclusiveStartKey=response["LastEvaluatedKey"],
            )

            if response.get("Items"):
                async with table.batch_writer() as batch:
                    for item in response["Items"]:
                        await batch.delete_item(
                            Key={"cache_key": item["cache_key"]}
                        )

    except Exception:
        pass

close async

close()

Close DynamoDB connections and clean up resources.

Source code in fast_cache/backends/dynamodb.py
async def close(self) -> None:
    """
    Close DynamoDB connections and clean up resources.
    """
    if self._async_resource:
        await self._async_resource.__aexit__(None, None, None)
        self._async_resource = None
        self._async_table = None

Backend Base Class

fast_cache.backends.backend.CacheBackend

Bases: ABC

Abstract base class for cache backends.

All cache backend implementations must inherit from this class and implement both synchronous and asynchronous methods for cache operations.

aget abstractmethod async

aget(key)

Asynchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached value, or None if not found.

Source code in fast_cache/backends/backend.py
@abstractmethod
async def aget(self, key: str) -> Optional[Any]:
    """
    Asynchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.

    Returns:
        Optional[Any]: The cached value, or None if not found.
    """
    pass

get abstractmethod

get(key)

Synchronously retrieve a value from the cache.

Parameters:

Name Type Description Default
key str

The key to retrieve.

required

Returns:

Type Description
Optional[Any]

Optional[Any]: The cached value, or None if not found.

Source code in fast_cache/backends/backend.py
@abstractmethod
def get(self, key: str) -> Optional[Any]:
    """
    Synchronously retrieve a value from the cache.

    Args:
        key (str): The key to retrieve.

    Returns:
        Optional[Any]: The cached value, or None if not found.
    """
    pass

aset abstractmethod async

aset(key, value, expire=None)

Asynchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/backend.py
@abstractmethod
async def aset(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Asynchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    pass

set abstractmethod

set(key, value, expire=None)

Synchronously set a value in the cache.

Parameters:

Name Type Description Default
key str

The key under which to store the value.

required
value Any

The value to store.

required
expire Optional[Union[int, timedelta]]

Expiration time in seconds or as timedelta.

None
Source code in fast_cache/backends/backend.py
@abstractmethod
def set(
    self, key: str, value: Any, expire: Optional[Union[int, timedelta]] = None
) -> None:
    """
    Synchronously set a value in the cache.

    Args:
        key (str): The key under which to store the value.
        value (Any): The value to store.
        expire (Optional[Union[int, timedelta]]): Expiration time in seconds or as timedelta.
    """
    pass

adelete abstractmethod async

adelete(key)

Asynchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/backend.py
@abstractmethod
async def adelete(self, key: str) -> None:
    """
    Asynchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    pass

delete abstractmethod

delete(key)

Synchronously delete a value from the cache.

Parameters:

Name Type Description Default
key str

The key to delete.

required
Source code in fast_cache/backends/backend.py
@abstractmethod
def delete(self, key: str) -> None:
    """
    Synchronously delete a value from the cache.

    Args:
        key (str): The key to delete.
    """
    pass

aclear abstractmethod async

aclear()

Asynchronously clear all values from the cache.

Source code in fast_cache/backends/backend.py
@abstractmethod
async def aclear(self) -> None:
    """
    Asynchronously clear all values from the cache.
    """
    pass

clear abstractmethod

clear()

Synchronously clear all values from the cache.

Source code in fast_cache/backends/backend.py
@abstractmethod
def clear(self) -> None:
    """
    Synchronously clear all values from the cache.
    """
    pass

ahas abstractmethod async

ahas(key)

Asynchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/backend.py
@abstractmethod
async def ahas(self, key: str) -> bool:
    """
    Asynchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    pass

has abstractmethod

has(key)

Synchronously check if a key exists in the cache.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in fast_cache/backends/backend.py
@abstractmethod
def has(self, key: str) -> bool:
    """
    Synchronously check if a key exists in the cache.

    Args:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    pass