Durable Primitives
Low-level building blocks for maximum control over durability, state management, and execution flow
Durable primitives provide direct access to AGNT5’s underlying durability engine.
They give you maximum control over execution flow, state management, and error handling.
Use these when you need custom durability patterns that go beyond the high-level building blocks.
Start with high-level building blocks (Tasks, Workflows, Agents) first. Use primitives only when you need specialized control.
What Durable Primitives Enable
Primitives provide fine-grained control over durable execution:
- Custom retry logic with specialized error handling strategies
- Manual state management with execution-scoped persistence
- Direct external service calls with exactly-once guarantees
- Parallel execution control with custom concurrency limits
- Advanced workflow patterns not possible with high-level abstractions
Core Primitives
@durable.function
Single reliable operation with exactly-once guarantees and custom control:
from agnt5 import durable, DurableContext
@durable.function(
name="data_processor",
max_retries=5,
retry_delay=2.0,
timeout=300.0,
deterministic=True
)
async def process_data(ctx: DurableContext, data: dict) -> dict:
"""Process data with custom durability controls."""
# Manual state management
await ctx.state.set("processing_started", True)
# Durable external service call
result = await ctx.call("processing_service", "transform", data)
# Update state with progress
await ctx.state.set("processed_count", len(result))
# Durable sleep between operations
await ctx.sleep(1.0)
# Another service call
enriched = await ctx.call("enrichment_service", "enrich", result)
return {"processed": enriched, "count": len(enriched)}
# Function automatically gets durability features
result = await process_data({"items": [1, 2, 3]})
@durable.flow
Multi-step process coordination with checkpointing and parallel execution:
@durable.flow(
name="data_pipeline",
checkpoint_interval=1, # Checkpoint after each step
max_concurrent_steps=5,
max_retries=3
)
async def data_pipeline(ctx: DurableContext, sources: list) -> dict:
"""Complex data pipeline with manual orchestration."""
# Step 1: Initialize pipeline state
await ctx.state.set("pipeline_started", True)
# Step 2: Fetch data from multiple sources in parallel
fetch_tasks = [
("fetch_source", fetch_data_source, source)
for source in sources
]
raw_data = await ctx.spawn(fetch_tasks)
await ctx.state.set("raw_data_count", len(raw_data))
# Step 3: Process data sequentially with state tracking
processed_data = []
for i, data in enumerate(raw_data):
processed = await ctx.call("processor_service", "process", data)
processed_data.append(processed)
# Update progress state
await ctx.state.set("processed_items", i + 1)
# Step 4: Generate final report
report = await ctx.call("report_service", "generate", {
"data": processed_data,
"metadata": {
"sources": len(sources),
"processed": len(processed_data)
}
})
return {"report": report, "summary": await ctx.state.get("processed_items")}
# Flow automatically checkpoints and recovers
result = await data_pipeline(["source1", "source2", "source3"])
@durable.object
Stateful entity with serialized access and custom persistence:
from agnt5 import durable, DurableObject, DurableContext
@durable.object
class OrderProcessor(DurableObject):
"""Order processing with custom state management."""
def __init__(self, order_id: str):
super().__init__(order_id)
self.order_id = order_id
self.status = "created"
self.items = []
self.processing_steps = []
self.error_count = 0
async def add_item(self, item_data: dict) -> dict:
"""Add item with automatic state persistence."""
self.items.append(item_data)
self.processing_steps.append(f"Added item: {item_data['id']}")
# State automatically persists after method execution
return {"added": True, "total_items": len(self.items)}
async def process_order(self, ctx: DurableContext) -> dict:
"""Process order with custom external service integration."""
try:
self.status = "processing"
self.processing_steps.append("Processing started")
# Validate inventory with durable service call
inventory_result = await ctx.call(
"inventory_service",
"check_availability",
{"items": self.items}
)
if not inventory_result["available"]:
self.status = "insufficient_inventory"
return {"success": False, "error": "Insufficient inventory"}
# Process payment
payment_result = await ctx.call(
"payment_service",
"charge",
{
"order_id": self.order_id,
"amount": self.calculate_total()
}
)
if payment_result["success"]:
self.status = "paid"
self.processing_steps.append("Payment processed")
# Schedule fulfillment
fulfillment = await ctx.call(
"fulfillment_service",
"schedule",
{"order_id": self.order_id, "items": self.items}
)
self.status = "fulfilled"
self.processing_steps.append("Fulfillment scheduled")
return {
"success": True,
"order_id": self.order_id,
"fulfillment_id": fulfillment["id"]
}
else:
self.status = "payment_failed"
return {"success": False, "error": "Payment failed"}
except Exception as e:
self.error_count += 1
self.status = "error"
self.processing_steps.append(f"Error: {str(e)}")
raise # Re-raise for retry logic
def calculate_total(self) -> float:
"""Calculate order total."""
return sum(item.get("price", 0) * item.get("quantity", 1) for item in self.items)
# Object provides serialized access per instance
processor = await OrderProcessor.get_or_create("order_123")
await processor.add_item({"id": "item_1", "price": 25.0, "quantity": 2})
result = await processor.process_order(ctx)
DurableContext API
The DurableContext
provides access to all durable operations:
External Service Calls
Make reliable calls to external services:
async def advanced_service_integration(ctx: DurableContext, data: dict) -> dict:
"""Advanced service integration with error handling."""
# Simple service call
result1 = await ctx.call("service_a", "method1", data)
# Service call with custom retry handling
try:
result2 = await ctx.call("service_b", "method2", result1)
except Exception as e:
# Log error and use fallback
await ctx.state.set("service_b_error", str(e))
result2 = await ctx.call("fallback_service", "method2", result1)
# Combine results
combined = await ctx.call("combiner_service", "merge", {
"result1": result1,
"result2": result2
})
return combined
State Management
Manage execution-scoped state that persists across failures:
async def stateful_processing(ctx: DurableContext, batch_data: list) -> dict:
"""Processing with comprehensive state management."""
# Initialize state
await ctx.state.set("batch_size", len(batch_data))
await ctx.state.set("processed_count", 0)
await ctx.state.set("errors", [])
processed_items = []
for i, item in enumerate(batch_data):
try:
# Process item
processed = await ctx.call("processor", "process_item", item)
processed_items.append(processed)
# Update progress
await ctx.state.set("processed_count", i + 1)
await ctx.state.set("last_processed_item", item["id"])
except Exception as e:
# Track errors in state
errors = await ctx.state.get("errors", [])
errors.append({"item_id": item["id"], "error": str(e)})
await ctx.state.set("errors", errors)
# Get final state summary
final_count = await ctx.state.get("processed_count")
error_list = await ctx.state.get("errors", [])
return {
"processed_items": processed_items,
"total_processed": final_count,
"errors": error_list,
"success_rate": final_count / len(batch_data)
}
Parallel Execution
Execute multiple operations concurrently with control:
async def parallel_data_processing(ctx: DurableContext, data_sources: list) -> dict:
"""Parallel processing with custom concurrency control."""
# Parallel data fetching
fetch_tasks = [
("fetch_data", fetch_from_source, source)
for source in data_sources
]
raw_datasets = await ctx.spawn(fetch_tasks)
# Parallel processing with different functions
processing_tasks = [
("process_dataset", process_dataset, dataset, i)
for i, dataset in enumerate(raw_datasets)
]
processed_datasets = await ctx.spawn(processing_tasks)
# Parallel analysis
analysis_tasks = [
("analyze_data", analyze_dataset, dataset)
for dataset in processed_datasets
]
analyses = await ctx.spawn(analysis_tasks)
# Combine all results
final_report = await ctx.call("report_generator", "combine", {
"raw_count": len(raw_datasets),
"processed_count": len(processed_datasets),
"analyses": analyses
})
return final_report
Event Handling
Wait for external events with timeouts:
async def event_driven_workflow(ctx: DurableContext, workflow_id: str) -> dict:
"""Workflow that waits for external events."""
# Start processing
await ctx.state.set("workflow_status", "waiting_for_approval")
# Wait for approval event
try:
approval = await ctx.wait_for_event(
event_id=f"approval_{workflow_id}",
timeout=86400.0 # 24 hours
)
if approval["approved"]:
await ctx.state.set("workflow_status", "approved")
# Process approved workflow
result = await ctx.call("processor", "execute_approved", {
"workflow_id": workflow_id,
"approval_data": approval
})
await ctx.state.set("workflow_status", "completed")
return {"success": True, "result": result}
else:
await ctx.state.set("workflow_status", "rejected")
return {"success": False, "reason": approval["reason"]}
except TimeoutError:
await ctx.state.set("workflow_status", "timeout")
return {"success": False, "reason": "Approval timeout"}
Object Management
Work with durable objects from functions and flows:
async def order_management_workflow(ctx: DurableContext, order_data: dict) -> dict:
"""Workflow that manages durable objects."""
order_id = order_data["order_id"]
# Get or create order processor object
processor = await ctx.get_object(OrderProcessor, order_id)
# Add items to order
for item in order_data["items"]:
await processor.add_item(item)
# Process the order
processing_result = await processor.process_order(ctx)
if processing_result["success"]:
# Create shipping object
shipper = await ctx.get_object(ShippingProcessor, order_id)
shipping_result = await shipper.schedule_shipment(ctx, processing_result)
return {
"order_processed": True,
"shipping_scheduled": shipping_result["success"],
"tracking_number": shipping_result.get("tracking_number")
}
else:
return {
"order_processed": False,
"error": processing_result["error"]
}
Advanced Configuration
Custom Retry Strategies
Implement specialized retry logic:
@durable.function(
max_retries=10,
retry_delay=1.0,
deterministic=False # Allow non-deterministic operations
)
async def resilient_api_integration(ctx: DurableContext, api_data: dict) -> dict:
"""API integration with custom retry strategy."""
retry_count = await ctx.state.get("retry_count", 0)
try:
# Attempt API call
result = await ctx.call("external_api", "process", api_data)
# Reset retry count on success
await ctx.state.set("retry_count", 0)
return result
except Exception as e:
# Increment retry count
retry_count += 1
await ctx.state.set("retry_count", retry_count)
# Custom retry delay based on error type
if "rate_limit" in str(e):
await ctx.sleep(retry_count * 60) # Exponential backoff for rate limits
elif "timeout" in str(e):
await ctx.sleep(retry_count * 2) # Linear backoff for timeouts
else:
await ctx.sleep(retry_count * 1.5) # Default backoff
# Re-raise for automatic retry
raise
Performance Optimization
Optimize primitive usage for high-performance scenarios:
@durable.flow(
checkpoint_interval=10, # Checkpoint every 10 steps
max_concurrent_steps=20, # High concurrency
deterministic=True
)
async def high_performance_pipeline(ctx: DurableContext, large_dataset: list) -> dict:
"""High-performance processing pipeline."""
# Batch processing for efficiency
batch_size = 100
batches = [large_dataset[i:i+batch_size] for i in range(0, len(large_dataset), batch_size)]
# Process batches in parallel
batch_tasks = [
("process_batch", process_data_batch, batch, i)
for i, batch in enumerate(batches)
]
batch_results = await ctx.spawn(batch_tasks)
# Combine results efficiently
final_result = await ctx.call("aggregator", "combine_batches", {
"batches": batch_results,
"total_items": len(large_dataset)
})
return final_result
async def process_data_batch(ctx: DurableContext, batch: list, batch_index: int) -> dict:
"""Process a single batch of data."""
processed_items = []
for item in batch:
# Minimal state updates for performance
processed = await ctx.call("fast_processor", "process", item)
processed_items.append(processed)
return {
"batch_index": batch_index,
"items": processed_items,
"count": len(processed_items)
}
Service Registration
Register primitives with the runtime:
from agnt5 import durable
# Create service
service = durable.service("advanced-service", "1.0.0")
# Register durable functions
service.add_function(process_data)
service.add_function(data_pipeline)
# Register durable objects
service.add_object(OrderProcessor)
# Get service configuration for runtime
config = service.get_service_config()
# Start service worker
worker = durable.get_worker(
service_name="advanced-service",
version="1.0.0",
coordinator_endpoint="http://localhost:8080"
)
await worker.register_function(process_data)
await worker.register_function(data_pipeline)
await worker.register_object(OrderProcessor)
await worker.start()
Integration with High-Level Building Blocks
Primitives work seamlessly with high-level components:
With Tasks
Use primitives within tasks for advanced control:
from agnt5 import task
@task
def advanced_task_processing(data: dict) -> dict:
"""Task that uses durable primitives internally."""
@durable.function
async def internal_processing(ctx: DurableContext, data: dict) -> dict:
# Use primitives for complex internal logic
result = await ctx.call("complex_service", "process", data)
await ctx.state.set("processing_complete", True)
return result
# Execute durable function within task
return await internal_processing(data)
With Workflows
Enhance workflows with primitive-level control:
from agnt5 import workflow
@workflow
async def enhanced_workflow(input_data: dict) -> dict:
"""Workflow enhanced with durable primitives."""
# Use durable flow for complex orchestration
pipeline_result = await data_pipeline(input_data["sources"])
# Use durable object for state management
processor = await OrderProcessor.get_or_create(input_data["order_id"])
order_result = await processor.process_order(ctx)
return {
"pipeline": pipeline_result,
"order": order_result
}
Common Patterns
Circuit Breaker Pattern
Implement circuit breaker with primitives:
@durable.function
async def circuit_breaker_service_call(ctx: DurableContext, service: str, method: str, data: dict) -> dict:
"""Service call with circuit breaker pattern."""
circuit_key = f"circuit_{service}_{method}"
failure_count = await ctx.state.get(f"{circuit_key}_failures", 0)
last_failure = await ctx.state.get(f"{circuit_key}_last_failure")
# Check if circuit is open
if failure_count >= 5:
if last_failure and (time.time() - last_failure) < 300: # 5 minutes
raise Exception(f"Circuit breaker open for {service}.{method}")
else:
# Reset circuit after timeout
await ctx.state.set(f"{circuit_key}_failures", 0)
try:
result = await ctx.call(service, method, data)
# Reset failure count on success
await ctx.state.set(f"{circuit_key}_failures", 0)
return result
except Exception as e:
# Increment failure count
await ctx.state.set(f"{circuit_key}_failures", failure_count + 1)
await ctx.state.set(f"{circuit_key}_last_failure", time.time())
raise
Saga Pattern
Implement saga pattern for distributed transactions:
@durable.flow
async def saga_transaction(ctx: DurableContext, transaction_data: dict) -> dict:
"""Saga pattern for distributed transaction."""
completed_steps = []
try:
# Step 1: Reserve inventory
inventory_result = await ctx.call("inventory", "reserve", transaction_data["items"])
completed_steps.append(("inventory", "reserve", inventory_result["reservation_id"]))
# Step 2: Process payment
payment_result = await ctx.call("payment", "charge", transaction_data["payment"])
completed_steps.append(("payment", "charge", payment_result["transaction_id"]))
# Step 3: Create order
order_result = await ctx.call("orders", "create", transaction_data["order"])
completed_steps.append(("orders", "create", order_result["order_id"]))
await ctx.state.set("saga_status", "completed")
return {"success": True, "order_id": order_result["order_id"]}
except Exception as e:
# Compensate completed steps in reverse order
await ctx.state.set("saga_status", "compensating")
for service, action, resource_id in reversed(completed_steps):
try:
if service == "inventory" and action == "reserve":
await ctx.call("inventory", "release", {"reservation_id": resource_id})
elif service == "payment" and action == "charge":
await ctx.call("payment", "refund", {"transaction_id": resource_id})
elif service == "orders" and action == "create":
await ctx.call("orders", "cancel", {"order_id": resource_id})
except Exception as comp_error:
# Log compensation failures
await ctx.state.set(f"compensation_error_{service}", str(comp_error))
await ctx.state.set("saga_status", "failed")
return {"success": False, "error": str(e)}
Performance Considerations
Function vs Flow vs Object:
- Use
@durable.function
for single operations with custom logic - Use
@durable.flow
for multi-step processes with parallel execution - Use
@durable.object
for stateful entities with multiple operations
State Management:
- Minimize state size for better performance
- Use appropriate checkpoint intervals
- Clean up unused state regularly
Concurrency Control:
- Configure
max_concurrent_steps
based on resource limits - Use object serialization to prevent race conditions
- Implement backpressure for high-throughput scenarios
Best Practices
Primitive Selection:
- Start with high-level building blocks first
- Use primitives only when you need specialized control
- Combine primitives with high-level components when appropriate
Error Handling:
- Implement custom retry strategies for different error types
- Use state to track error patterns and implement circuit breakers
- Always clean up resources in compensation logic
State Management:
- Keep state minimal and focused
- Use descriptive state keys
- Implement state cleanup strategies
- Version your state schema for evolution
Performance:
- Batch operations when possible
- Use appropriate parallelism levels
- Monitor primitive performance and adjust configuration
Next Steps
Durable primitives provide maximum control over AGNT5’s durability engine. Learn how they integrate with other components:
- Tasks - Build on primitives with higher-level abstractions
- Workflows - Combine primitives in complex orchestrations
- Agents - Add AI capabilities to primitive-based applications
- Entities - Use durable objects for stateful patterns
Ready to build with maximum control? Check out the Quick Start Guide to get started with durable primitives.