Skip to content

Batch Operations

Batch read/write operations with auto-chunking and exponential backoff.

batch

Batch operations for Dynantic.

Provides batch_get, batch_save, batch_delete, and a BatchWriter context manager with auto-flush at the DynamoDB 25-item limit.

BatchWriter

Context manager for mixed batch put/delete operations with auto-flush.

Usage

with User.batch_writer() as batch: batch.save(user1) batch.save(user2) batch.delete(user_id="u3")

Source code in dynantic/batch.py
class BatchWriter:
    """
    Context manager for mixed batch put/delete operations with auto-flush.

    Usage:
        with User.batch_writer() as batch:
            batch.save(user1)
            batch.save(user2)
            batch.delete(user_id="u3")
    """

    def __init__(
        self,
        model_cls: type[DynamoModel],
        client: Any,
        serializer: DynamoSerializer,
        table_name: str,
    ) -> None:
        self._model_cls = model_cls
        self._client = client
        self._serializer = serializer
        self._table_name = table_name
        self._buffer: list[dict[str, Any]] = []

    def save(self, item: DynamoModel) -> None:
        """Add a PutItem request to the batch."""
        data = item.model_dump(mode="python", exclude_none=True)

        # Handle TTL conversion
        from .config import convert_ttl_fields

        convert_ttl_fields(data, item._meta)

        dynamo_item = self._serializer.to_dynamo(data)
        self._buffer.append({"PutRequest": {"Item": dynamo_item}})

        if len(self._buffer) >= BATCH_WRITE_LIMIT:
            self._flush()

    def delete(self, **key_values: Any) -> None:
        """Add a DeleteItem request to the batch."""
        dynamo_key = self._serializer.to_dynamo(key_values)
        self._buffer.append({"DeleteRequest": {"Key": dynamo_key}})

        if len(self._buffer) >= BATCH_WRITE_LIMIT:
            self._flush()

    def _flush(self) -> None:
        if not self._buffer:
            return

        with handle_dynamo_errors(table_name=self._table_name):
            batch_write_with_retry(self._client, self._table_name, list(self._buffer))
        self._buffer.clear()

    def __enter__(self) -> BatchWriter:
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        if exc_type is None:
            self._flush()

save

save(item: DynamoModel) -> None

Add a PutItem request to the batch.

Source code in dynantic/batch.py
def save(self, item: DynamoModel) -> None:
    """Add a PutItem request to the batch."""
    data = item.model_dump(mode="python", exclude_none=True)

    # Handle TTL conversion
    from .config import convert_ttl_fields

    convert_ttl_fields(data, item._meta)

    dynamo_item = self._serializer.to_dynamo(data)
    self._buffer.append({"PutRequest": {"Item": dynamo_item}})

    if len(self._buffer) >= BATCH_WRITE_LIMIT:
        self._flush()

delete

delete(**key_values: Any) -> None

Add a DeleteItem request to the batch.

Source code in dynantic/batch.py
def delete(self, **key_values: Any) -> None:
    """Add a DeleteItem request to the batch."""
    dynamo_key = self._serializer.to_dynamo(key_values)
    self._buffer.append({"DeleteRequest": {"Key": dynamo_key}})

    if len(self._buffer) >= BATCH_WRITE_LIMIT:
        self._flush()

batch_write_with_retry

batch_write_with_retry(
    client: Any,
    table_name: str,
    requests: list[dict[str, Any]],
) -> None

Execute batch_write_item with chunking and exponential backoff.

Source code in dynantic/batch.py
def batch_write_with_retry(client: Any, table_name: str, requests: list[dict[str, Any]]) -> None:
    """Execute batch_write_item with chunking and exponential backoff."""
    for i in range(0, len(requests), BATCH_WRITE_LIMIT):
        chunk = requests[i : i + BATCH_WRITE_LIMIT]
        _execute_batch_write_chunk(client, table_name, chunk)

batch_get_with_retry

batch_get_with_retry(
    client: Any, table_name: str, keys: list[dict[str, Any]]
) -> list[dict[str, Any]]

Execute batch_get_item with chunking and exponential backoff.

Source code in dynantic/batch.py
def batch_get_with_retry(
    client: Any, table_name: str, keys: list[dict[str, Any]]
) -> list[dict[str, Any]]:
    """Execute batch_get_item with chunking and exponential backoff."""
    all_items: list[dict[str, Any]] = []

    for i in range(0, len(keys), BATCH_GET_LIMIT):
        chunk = keys[i : i + BATCH_GET_LIMIT]
        items = _execute_batch_get_chunk(client, table_name, chunk)
        all_items.extend(items)

    return all_items