Bulk Operations
This guide covers all batch operations available in the Teable Client Library for efficient handling of multiple records.
Batch Creation
Basic Batch Creation
from teable import TeableClient, TeableConfig
# Initialize client
client = TeableClient(TeableConfig(api_key="your_api_key"))
# Prepare records data
records_data = [
{
"Name": "Alice Smith",
"Email": "alice@example.com",
"Age": 25
},
{
"Name": "Bob Johnson",
"Email": "bob@example.com",
"Age": 35
}
]
# Create multiple records at once
batch_result = client.records.batch_create_records(
table_id="table_id",
records=records_data,
field_key_type="name", # Use field names instead of IDs
typecast=True # Enable automatic type conversion
)
print(f"Successfully created {batch_result.success_count} records")
if batch_result.failure_count > 0:
print(f"Failed to create {batch_result.failure_count} records")
for error in batch_result.errors:
print(f"Error in record {error.index}: {error.message}")
Batch Creation with Ordering
# Create records with specific ordering
batch_result = client.records.batch_create_records(
table_id="table_id",
records=records_data,
field_key_type="name",
typecast=True,
order={
"viewId": "view_id",
"position": "after",
"recordId": "anchor_record_id"
}
)
Batch Updates
Basic Batch Update
# Prepare update data
updates = [
{
"id": "record1_id",
"fields": {
"Status": "Completed",
"Progress": 100
}
},
{
"id": "record2_id",
"fields": {
"Status": "In Progress",
"Progress": 50
}
}
]
# Update multiple records at once
updated_records = client.records.batch_update_records(
table_id="table_id",
updates=updates,
field_key_type="name",
typecast=True
)
Batch Update with Ordering
# Update records with specific ordering
updated_records = client.records.batch_update_records(
table_id="table_id",
updates=updates,
field_key_type="name",
typecast=True,
order={
"viewId": "view_id",
"position": "after",
"recordId": "anchor_record_id"
}
)
Batch Deletion
# Delete multiple records at once
record_ids = ["record1_id", "record2_id", "record3_id"]
success = client.records.batch_delete_records(
table_id="table_id",
record_ids=record_ids
)
Validation Rules
Batch Size Limits
# Maximum 2000 records per batch operation
try:
records = [{"fields": {}} for _ in range(2001)] # Too many records
client.records.batch_create_records(
table_id="table_id",
records=records
)
except ValidationError as e:
print(f"Validation error: {str(e)}")
Field Validation
def validate_batch_data(records):
"""Validate batch data before sending to API."""
for i, record in enumerate(records):
if not isinstance(record, dict):
raise ValidationError(f"Record {i} must be a dictionary")
if "fields" not in record:
raise ValidationError(f"Record {i} missing 'fields' key")
fields = record["fields"]
if not isinstance(fields, dict):
raise ValidationError(f"Fields in record {i} must be a dictionary")
# Validate required fields
if "Name" in fields and not fields["Name"]:
raise ValidationError(f"Name in record {i} cannot be empty")
# Validate email format
if "Email" in fields:
email = fields["Email"]
if not "@" in email:
raise ValidationError(f"Invalid email in record {i}")
# Use validation
try:
validate_batch_data(records_data)
batch_result = client.records.batch_create_records(
table_id="table_id",
records=records_data
)
except ValidationError as e:
print(f"Validation failed: {str(e)}")
Error Handling
Basic Error Handling
from teable.exceptions import ValidationError, APIError
try:
batch_result = client.records.batch_create_records(
table_id="table_id",
records=records_data
)
except ValidationError as e:
print(f"Validation error: {str(e)}")
except APIError as e:
print(f"API error: {str(e)}")
print(f"Status code: {e.status_code}")
print(f"Error details: {e.details}")
Handling Partial Success
def handle_batch_operation(client, table_id, records):
"""Handle batch operation with partial success tracking."""
try:
batch_result = client.records.batch_create_records(
table_id=table_id,
records=records
)
# Handle successful records
if batch_result.success_count > 0:
print(f"Successfully processed {batch_result.success_count} records")
# Handle failed records
if batch_result.failure_count > 0:
print(f"Failed to process {batch_result.failure_count} records")
failed_records = []
for error in batch_result.errors:
print(f"Record {error.index}: {error.message}")
failed_records.append(records[error.index])
# Implement retry logic for failed records
return retry_failed_records(client, table_id, failed_records)
return batch_result
except APIError as e:
print(f"Batch operation failed: {str(e)}")
raise
Best Practices
1. Process in Batches
def process_large_dataset(client, table_id, records, batch_size=500):
"""Process large number of records in batches."""
results = []
# Split into smaller batches
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
try:
batch_result = client.records.batch_create_records(
table_id=table_id,
records=batch
)
results.append(batch_result)
except APIError as e:
print(f"Batch {i//batch_size + 1} failed: {str(e)}")
# Handle error or retry
return results
2. Implement Retry Logic
import time
from teable.exceptions import APIError
def retry_batch_operation(operation, max_retries=3):
"""Retry batch operation with exponential backoff."""
for attempt in range(max_retries):
try:
return operation()
except APIError as e:
if e.status_code == 429: # Rate limit
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
raise
raise Exception("Max retries exceeded")
# Usage
batch_result = retry_batch_operation(
lambda: client.records.batch_create_records(
table_id=table_id,
records=records_data
)
)
3. Validate Before Processing
def validate_and_process(client, table_id, records):
"""Validate and process records with error handling."""
# Group records by operation type
to_create = []
to_update = []
to_delete = []
for record in records:
try:
if "id" not in record:
# New record
validate_creation_data(record)
to_create.append(record)
elif record.get("_delete"):
# Record to delete
validate_record_id(record["id"])
to_delete.append(record["id"])
else:
# Record to update
validate_update_data(record)
to_update.append(record)
except ValidationError as e:
print(f"Validation failed for record: {str(e)}")
continue
# Process each group
results = {
"created": [],
"updated": [],
"deleted": [],
"failed": []
}
if to_create:
try:
batch_result = client.records.batch_create_records(
table_id=table_id,
records=to_create
)
results["created"] = batch_result.successful
except APIError as e:
print(f"Creation failed: {str(e)}")
if to_update:
try:
updated = client.records.batch_update_records(
table_id=table_id,
updates=to_update
)
results["updated"] = updated
except APIError as e:
print(f"Update failed: {str(e)}")
if to_delete:
try:
success = client.records.batch_delete_records(
table_id=table_id,
record_ids=to_delete
)
if success:
results["deleted"] = to_delete
except APIError as e:
print(f"Deletion failed: {str(e)}")
return results
4. Monitor Progress
```python from datetime import datetime
def process_with_monitoring(client, table_id, records, batch_size=500): """Process records with progress monitoring.""" start_time = datetime.now() total_batches = (len(records) + batch_size - 1) // batch_size processed = 0
print(f"Starting batch processing of {len(records)} records")
for i in range(0, len(records), batch_size):
batch_start = datetime.now()
batch = records[i:i + batch_size]
try:
batch_result = client.records.batch_create_records(
table_id=table_id,
records=batch
)
processed += batch_result.success_count
# Calculate progress
batch_num = i // batch_size + 1
progress = (batch_num / total_batches) * 100
elapsed = datetime.now() - start_time
batch_time = datetime.now() - batch_start
print(f"Batch {batch_num}/{total_batches} ({progress:.1f}%)")
print(f"Processed: {processed}/{len(records)}")
print(f"Batch time: {batch_time}")
print(f"Total time: {elapsed}")
except APIError as e:
print(f"Batch {batch_num} failed: {str(e)}")
total_time = datetime.now() - start_time
print(f"\nProcessing complete:")
print(f"Total records: {len(records)}")
print(f"Successfully processed: {processed}")
print(f"Total time: {total_time}")