Batch operations for Dynantic.
Provides batch_get, batch_save, batch_delete, and a BatchWriter
context manager with auto-flush at the DynamoDB 25-item limit.
BatchWriter
Context manager for mixed batch put/delete operations with auto-flush.
Usage
with User.batch_writer() as batch:
batch.save(user1)
batch.save(user2)
batch.delete(user_id="u3")
Source code in dynantic/batch.py
| class BatchWriter:
"""
Context manager for mixed batch put/delete operations with auto-flush.
Usage:
with User.batch_writer() as batch:
batch.save(user1)
batch.save(user2)
batch.delete(user_id="u3")
"""
def __init__(
self,
model_cls: type[DynamoModel],
client: Any,
serializer: DynamoSerializer,
table_name: str,
) -> None:
self._model_cls = model_cls
self._client = client
self._serializer = serializer
self._table_name = table_name
self._buffer: list[dict[str, Any]] = []
def save(self, item: DynamoModel) -> None:
"""Add a PutItem request to the batch."""
data = item.model_dump(mode="python", exclude_none=True)
# Handle TTL conversion
from .config import convert_ttl_fields
convert_ttl_fields(data, item._meta)
dynamo_item = self._serializer.to_dynamo(data)
self._buffer.append({"PutRequest": {"Item": dynamo_item}})
if len(self._buffer) >= BATCH_WRITE_LIMIT:
self._flush()
def delete(self, **key_values: Any) -> None:
"""Add a DeleteItem request to the batch."""
dynamo_key = self._serializer.to_dynamo(key_values)
self._buffer.append({"DeleteRequest": {"Key": dynamo_key}})
if len(self._buffer) >= BATCH_WRITE_LIMIT:
self._flush()
def _flush(self) -> None:
if not self._buffer:
return
with handle_dynamo_errors(table_name=self._table_name):
batch_write_with_retry(self._client, self._table_name, list(self._buffer))
self._buffer.clear()
def __enter__(self) -> BatchWriter:
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
if exc_type is None:
self._flush()
|
save
save(item: DynamoModel) -> None
Add a PutItem request to the batch.
Source code in dynantic/batch.py
| def save(self, item: DynamoModel) -> None:
"""Add a PutItem request to the batch."""
data = item.model_dump(mode="python", exclude_none=True)
# Handle TTL conversion
from .config import convert_ttl_fields
convert_ttl_fields(data, item._meta)
dynamo_item = self._serializer.to_dynamo(data)
self._buffer.append({"PutRequest": {"Item": dynamo_item}})
if len(self._buffer) >= BATCH_WRITE_LIMIT:
self._flush()
|
delete
delete(**key_values: Any) -> None
Add a DeleteItem request to the batch.
Source code in dynantic/batch.py
| def delete(self, **key_values: Any) -> None:
"""Add a DeleteItem request to the batch."""
dynamo_key = self._serializer.to_dynamo(key_values)
self._buffer.append({"DeleteRequest": {"Key": dynamo_key}})
if len(self._buffer) >= BATCH_WRITE_LIMIT:
self._flush()
|
batch_write_with_retry
batch_write_with_retry(
client: Any,
table_name: str,
requests: list[dict[str, Any]],
) -> None
Execute batch_write_item with chunking and exponential backoff.
Source code in dynantic/batch.py
| def batch_write_with_retry(client: Any, table_name: str, requests: list[dict[str, Any]]) -> None:
"""Execute batch_write_item with chunking and exponential backoff."""
for i in range(0, len(requests), BATCH_WRITE_LIMIT):
chunk = requests[i : i + BATCH_WRITE_LIMIT]
_execute_batch_write_chunk(client, table_name, chunk)
|
batch_get_with_retry
batch_get_with_retry(
client: Any, table_name: str, keys: list[dict[str, Any]]
) -> list[dict[str, Any]]
Execute batch_get_item with chunking and exponential backoff.
Source code in dynantic/batch.py
| def batch_get_with_retry(
client: Any, table_name: str, keys: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Execute batch_get_item with chunking and exponential backoff."""
all_items: list[dict[str, Any]] = []
for i in range(0, len(keys), BATCH_GET_LIMIT):
chunk = keys[i : i + BATCH_GET_LIMIT]
items = _execute_batch_get_chunk(client, table_name, chunk)
all_items.extend(items)
return all_items
|