Durable Primitives

Low-level building blocks for maximum control over durability, state management, and execution flow


Durable primitives provide direct access to AGNT5’s underlying durability engine.

They give you maximum control over execution flow, state management, and error handling.

Use these when you need custom durability patterns that go beyond the high-level building blocks.

Start with high-level building blocks (Tasks, Workflows, Agents) first. Use primitives only when you need specialized control.

What Durable Primitives Enable

Primitives provide fine-grained control over durable execution:

  • Custom retry logic with specialized error handling strategies
  • Manual state management with execution-scoped persistence
  • Direct external service calls with exactly-once guarantees
  • Parallel execution control with custom concurrency limits
  • Advanced workflow patterns not possible with high-level abstractions

Core Primitives

@durable.function

Single reliable operation with exactly-once guarantees and custom control:

from agnt5 import durable, DurableContext

@durable.function(
    name="data_processor",
    max_retries=5,
    retry_delay=2.0,
    timeout=300.0,
    deterministic=True
)
async def process_data(ctx: DurableContext, data: dict) -> dict:
    """Process data with custom durability controls."""
    # Manual state management
    await ctx.state.set("processing_started", True)
    
    # Durable external service call
    result = await ctx.call("processing_service", "transform", data)
    
    # Update state with progress
    await ctx.state.set("processed_count", len(result))
    
    # Durable sleep between operations
    await ctx.sleep(1.0)
    
    # Another service call
    enriched = await ctx.call("enrichment_service", "enrich", result)
    
    return {"processed": enriched, "count": len(enriched)}

# Function automatically gets durability features
result = await process_data({"items": [1, 2, 3]})

@durable.flow

Multi-step process coordination with checkpointing and parallel execution:

@durable.flow(
    name="data_pipeline",
    checkpoint_interval=1,  # Checkpoint after each step
    max_concurrent_steps=5,
    max_retries=3
)
async def data_pipeline(ctx: DurableContext, sources: list) -> dict:
    """Complex data pipeline with manual orchestration."""
    # Step 1: Initialize pipeline state
    await ctx.state.set("pipeline_started", True)
    
    # Step 2: Fetch data from multiple sources in parallel
    fetch_tasks = [
        ("fetch_source", fetch_data_source, source) 
        for source in sources
    ]
    
    raw_data = await ctx.spawn(fetch_tasks)
    await ctx.state.set("raw_data_count", len(raw_data))
    
    # Step 3: Process data sequentially with state tracking
    processed_data = []
    for i, data in enumerate(raw_data):
        processed = await ctx.call("processor_service", "process", data)
        processed_data.append(processed)
        
        # Update progress state
        await ctx.state.set("processed_items", i + 1)
    
    # Step 4: Generate final report
    report = await ctx.call("report_service", "generate", {
        "data": processed_data,
        "metadata": {
            "sources": len(sources),
            "processed": len(processed_data)
        }
    })
    
    return {"report": report, "summary": await ctx.state.get("processed_items")}

# Flow automatically checkpoints and recovers
result = await data_pipeline(["source1", "source2", "source3"])

@durable.object

Stateful entity with serialized access and custom persistence:

from agnt5 import durable, DurableObject, DurableContext

@durable.object
class OrderProcessor(DurableObject):
    """Order processing with custom state management."""
    
    def __init__(self, order_id: str):
        super().__init__(order_id)
        self.order_id = order_id
        self.status = "created"
        self.items = []
        self.processing_steps = []
        self.error_count = 0
    
    async def add_item(self, item_data: dict) -> dict:
        """Add item with automatic state persistence."""
        self.items.append(item_data)
        self.processing_steps.append(f"Added item: {item_data['id']}")
        
        # State automatically persists after method execution
        return {"added": True, "total_items": len(self.items)}
    
    async def process_order(self, ctx: DurableContext) -> dict:
        """Process order with custom external service integration."""
        try:
            self.status = "processing"
            self.processing_steps.append("Processing started")
            
            # Validate inventory with durable service call
            inventory_result = await ctx.call(
                "inventory_service", 
                "check_availability", 
                {"items": self.items}
            )
            
            if not inventory_result["available"]:
                self.status = "insufficient_inventory"
                return {"success": False, "error": "Insufficient inventory"}
            
            # Process payment
            payment_result = await ctx.call(
                "payment_service",
                "charge",
                {
                    "order_id": self.order_id,
                    "amount": self.calculate_total()
                }
            )
            
            if payment_result["success"]:
                self.status = "paid"
                self.processing_steps.append("Payment processed")
                
                # Schedule fulfillment
                fulfillment = await ctx.call(
                    "fulfillment_service",
                    "schedule",
                    {"order_id": self.order_id, "items": self.items}
                )
                
                self.status = "fulfilled"
                self.processing_steps.append("Fulfillment scheduled")
                
                return {
                    "success": True,
                    "order_id": self.order_id,
                    "fulfillment_id": fulfillment["id"]
                }
            else:
                self.status = "payment_failed"
                return {"success": False, "error": "Payment failed"}
                
        except Exception as e:
            self.error_count += 1
            self.status = "error"
            self.processing_steps.append(f"Error: {str(e)}")
            raise  # Re-raise for retry logic
    
    def calculate_total(self) -> float:
        """Calculate order total."""
        return sum(item.get("price", 0) * item.get("quantity", 1) for item in self.items)

# Object provides serialized access per instance
processor = await OrderProcessor.get_or_create("order_123")
await processor.add_item({"id": "item_1", "price": 25.0, "quantity": 2})
result = await processor.process_order(ctx)

DurableContext API

The DurableContext provides access to all durable operations:

External Service Calls

Make reliable calls to external services:

async def advanced_service_integration(ctx: DurableContext, data: dict) -> dict:
    """Advanced service integration with error handling."""
    
    # Simple service call
    result1 = await ctx.call("service_a", "method1", data)
    
    # Service call with custom retry handling
    try:
        result2 = await ctx.call("service_b", "method2", result1)
    except Exception as e:
        # Log error and use fallback
        await ctx.state.set("service_b_error", str(e))
        result2 = await ctx.call("fallback_service", "method2", result1)
    
    # Combine results
    combined = await ctx.call("combiner_service", "merge", {
        "result1": result1,
        "result2": result2
    })
    
    return combined

State Management

Manage execution-scoped state that persists across failures:

async def stateful_processing(ctx: DurableContext, batch_data: list) -> dict:
    """Processing with comprehensive state management."""
    
    # Initialize state
    await ctx.state.set("batch_size", len(batch_data))
    await ctx.state.set("processed_count", 0)
    await ctx.state.set("errors", [])
    
    processed_items = []
    
    for i, item in enumerate(batch_data):
        try:
            # Process item
            processed = await ctx.call("processor", "process_item", item)
            processed_items.append(processed)
            
            # Update progress
            await ctx.state.set("processed_count", i + 1)
            await ctx.state.set("last_processed_item", item["id"])
            
        except Exception as e:
            # Track errors in state
            errors = await ctx.state.get("errors", [])
            errors.append({"item_id": item["id"], "error": str(e)})
            await ctx.state.set("errors", errors)
    
    # Get final state summary
    final_count = await ctx.state.get("processed_count")
    error_list = await ctx.state.get("errors", [])
    
    return {
        "processed_items": processed_items,
        "total_processed": final_count,
        "errors": error_list,
        "success_rate": final_count / len(batch_data)
    }

Parallel Execution

Execute multiple operations concurrently with control:

async def parallel_data_processing(ctx: DurableContext, data_sources: list) -> dict:
    """Parallel processing with custom concurrency control."""
    
    # Parallel data fetching
    fetch_tasks = [
        ("fetch_data", fetch_from_source, source)
        for source in data_sources
    ]
    
    raw_datasets = await ctx.spawn(fetch_tasks)
    
    # Parallel processing with different functions
    processing_tasks = [
        ("process_dataset", process_dataset, dataset, i)
        for i, dataset in enumerate(raw_datasets)
    ]
    
    processed_datasets = await ctx.spawn(processing_tasks)
    
    # Parallel analysis
    analysis_tasks = [
        ("analyze_data", analyze_dataset, dataset)
        for dataset in processed_datasets
    ]
    
    analyses = await ctx.spawn(analysis_tasks)
    
    # Combine all results
    final_report = await ctx.call("report_generator", "combine", {
        "raw_count": len(raw_datasets),
        "processed_count": len(processed_datasets),
        "analyses": analyses
    })
    
    return final_report

Event Handling

Wait for external events with timeouts:

async def event_driven_workflow(ctx: DurableContext, workflow_id: str) -> dict:
    """Workflow that waits for external events."""
    
    # Start processing
    await ctx.state.set("workflow_status", "waiting_for_approval")
    
    # Wait for approval event
    try:
        approval = await ctx.wait_for_event(
            event_id=f"approval_{workflow_id}",
            timeout=86400.0  # 24 hours
        )
        
        if approval["approved"]:
            await ctx.state.set("workflow_status", "approved")
            
            # Process approved workflow
            result = await ctx.call("processor", "execute_approved", {
                "workflow_id": workflow_id,
                "approval_data": approval
            })
            
            await ctx.state.set("workflow_status", "completed")
            return {"success": True, "result": result}
        else:
            await ctx.state.set("workflow_status", "rejected")
            return {"success": False, "reason": approval["reason"]}
            
    except TimeoutError:
        await ctx.state.set("workflow_status", "timeout")
        return {"success": False, "reason": "Approval timeout"}

Object Management

Work with durable objects from functions and flows:

async def order_management_workflow(ctx: DurableContext, order_data: dict) -> dict:
    """Workflow that manages durable objects."""
    
    order_id = order_data["order_id"]
    
    # Get or create order processor object
    processor = await ctx.get_object(OrderProcessor, order_id)
    
    # Add items to order
    for item in order_data["items"]:
        await processor.add_item(item)
    
    # Process the order
    processing_result = await processor.process_order(ctx)
    
    if processing_result["success"]:
        # Create shipping object
        shipper = await ctx.get_object(ShippingProcessor, order_id)
        shipping_result = await shipper.schedule_shipment(ctx, processing_result)
        
        return {
            "order_processed": True,
            "shipping_scheduled": shipping_result["success"],
            "tracking_number": shipping_result.get("tracking_number")
        }
    else:
        return {
            "order_processed": False,
            "error": processing_result["error"]
        }

Advanced Configuration

Custom Retry Strategies

Implement specialized retry logic:

@durable.function(
    max_retries=10,
    retry_delay=1.0,
    deterministic=False  # Allow non-deterministic operations
)
async def resilient_api_integration(ctx: DurableContext, api_data: dict) -> dict:
    """API integration with custom retry strategy."""
    
    retry_count = await ctx.state.get("retry_count", 0)
    
    try:
        # Attempt API call
        result = await ctx.call("external_api", "process", api_data)
        
        # Reset retry count on success
        await ctx.state.set("retry_count", 0)
        return result
        
    except Exception as e:
        # Increment retry count
        retry_count += 1
        await ctx.state.set("retry_count", retry_count)
        
        # Custom retry delay based on error type
        if "rate_limit" in str(e):
            await ctx.sleep(retry_count * 60)  # Exponential backoff for rate limits
        elif "timeout" in str(e):
            await ctx.sleep(retry_count * 2)   # Linear backoff for timeouts
        else:
            await ctx.sleep(retry_count * 1.5) # Default backoff
        
        # Re-raise for automatic retry
        raise

Performance Optimization

Optimize primitive usage for high-performance scenarios:

@durable.flow(
    checkpoint_interval=10,  # Checkpoint every 10 steps
    max_concurrent_steps=20,  # High concurrency
    deterministic=True
)
async def high_performance_pipeline(ctx: DurableContext, large_dataset: list) -> dict:
    """High-performance processing pipeline."""
    
    # Batch processing for efficiency
    batch_size = 100
    batches = [large_dataset[i:i+batch_size] for i in range(0, len(large_dataset), batch_size)]
    
    # Process batches in parallel
    batch_tasks = [
        ("process_batch", process_data_batch, batch, i)
        for i, batch in enumerate(batches)
    ]
    
    batch_results = await ctx.spawn(batch_tasks)
    
    # Combine results efficiently
    final_result = await ctx.call("aggregator", "combine_batches", {
        "batches": batch_results,
        "total_items": len(large_dataset)
    })
    
    return final_result

async def process_data_batch(ctx: DurableContext, batch: list, batch_index: int) -> dict:
    """Process a single batch of data."""
    processed_items = []
    
    for item in batch:
        # Minimal state updates for performance
        processed = await ctx.call("fast_processor", "process", item)
        processed_items.append(processed)
    
    return {
        "batch_index": batch_index,
        "items": processed_items,
        "count": len(processed_items)
    }

Service Registration

Register primitives with the runtime:

from agnt5 import durable

# Create service
service = durable.service("advanced-service", "1.0.0")

# Register durable functions
service.add_function(process_data)
service.add_function(data_pipeline)

# Register durable objects
service.add_object(OrderProcessor)

# Get service configuration for runtime
config = service.get_service_config()

# Start service worker
worker = durable.get_worker(
    service_name="advanced-service",
    version="1.0.0",
    coordinator_endpoint="http://localhost:8080"
)

await worker.register_function(process_data)
await worker.register_function(data_pipeline)
await worker.register_object(OrderProcessor)
await worker.start()

Integration with High-Level Building Blocks

Primitives work seamlessly with high-level components:

With Tasks

Use primitives within tasks for advanced control:

from agnt5 import task

@task
def advanced_task_processing(data: dict) -> dict:
    """Task that uses durable primitives internally."""
    
    @durable.function
    async def internal_processing(ctx: DurableContext, data: dict) -> dict:
        # Use primitives for complex internal logic
        result = await ctx.call("complex_service", "process", data)
        await ctx.state.set("processing_complete", True)
        return result
    
    # Execute durable function within task
    return await internal_processing(data)

With Workflows

Enhance workflows with primitive-level control:

from agnt5 import workflow

@workflow
async def enhanced_workflow(input_data: dict) -> dict:
    """Workflow enhanced with durable primitives."""
    
    # Use durable flow for complex orchestration
    pipeline_result = await data_pipeline(input_data["sources"])
    
    # Use durable object for state management
    processor = await OrderProcessor.get_or_create(input_data["order_id"])
    order_result = await processor.process_order(ctx)
    
    return {
        "pipeline": pipeline_result,
        "order": order_result
    }

Common Patterns

Circuit Breaker Pattern

Implement circuit breaker with primitives:

@durable.function
async def circuit_breaker_service_call(ctx: DurableContext, service: str, method: str, data: dict) -> dict:
    """Service call with circuit breaker pattern."""
    
    circuit_key = f"circuit_{service}_{method}"
    failure_count = await ctx.state.get(f"{circuit_key}_failures", 0)
    last_failure = await ctx.state.get(f"{circuit_key}_last_failure")
    
    # Check if circuit is open
    if failure_count >= 5:
        if last_failure and (time.time() - last_failure) < 300:  # 5 minutes
            raise Exception(f"Circuit breaker open for {service}.{method}")
        else:
            # Reset circuit after timeout
            await ctx.state.set(f"{circuit_key}_failures", 0)
    
    try:
        result = await ctx.call(service, method, data)
        
        # Reset failure count on success
        await ctx.state.set(f"{circuit_key}_failures", 0)
        return result
        
    except Exception as e:
        # Increment failure count
        await ctx.state.set(f"{circuit_key}_failures", failure_count + 1)
        await ctx.state.set(f"{circuit_key}_last_failure", time.time())
        raise

Saga Pattern

Implement saga pattern for distributed transactions:

@durable.flow
async def saga_transaction(ctx: DurableContext, transaction_data: dict) -> dict:
    """Saga pattern for distributed transaction."""
    
    completed_steps = []
    
    try:
        # Step 1: Reserve inventory
        inventory_result = await ctx.call("inventory", "reserve", transaction_data["items"])
        completed_steps.append(("inventory", "reserve", inventory_result["reservation_id"]))
        
        # Step 2: Process payment
        payment_result = await ctx.call("payment", "charge", transaction_data["payment"])
        completed_steps.append(("payment", "charge", payment_result["transaction_id"]))
        
        # Step 3: Create order
        order_result = await ctx.call("orders", "create", transaction_data["order"])
        completed_steps.append(("orders", "create", order_result["order_id"]))
        
        await ctx.state.set("saga_status", "completed")
        return {"success": True, "order_id": order_result["order_id"]}
        
    except Exception as e:
        # Compensate completed steps in reverse order
        await ctx.state.set("saga_status", "compensating")
        
        for service, action, resource_id in reversed(completed_steps):
            try:
                if service == "inventory" and action == "reserve":
                    await ctx.call("inventory", "release", {"reservation_id": resource_id})
                elif service == "payment" and action == "charge":
                    await ctx.call("payment", "refund", {"transaction_id": resource_id})
                elif service == "orders" and action == "create":
                    await ctx.call("orders", "cancel", {"order_id": resource_id})
            except Exception as comp_error:
                # Log compensation failures
                await ctx.state.set(f"compensation_error_{service}", str(comp_error))
        
        await ctx.state.set("saga_status", "failed")
        return {"success": False, "error": str(e)}

Performance Considerations

Function vs Flow vs Object:

  • Use @durable.function for single operations with custom logic
  • Use @durable.flow for multi-step processes with parallel execution
  • Use @durable.object for stateful entities with multiple operations

State Management:

  • Minimize state size for better performance
  • Use appropriate checkpoint intervals
  • Clean up unused state regularly

Concurrency Control:

  • Configure max_concurrent_steps based on resource limits
  • Use object serialization to prevent race conditions
  • Implement backpressure for high-throughput scenarios

Best Practices

Primitive Selection:

  • Start with high-level building blocks first
  • Use primitives only when you need specialized control
  • Combine primitives with high-level components when appropriate

Error Handling:

  • Implement custom retry strategies for different error types
  • Use state to track error patterns and implement circuit breakers
  • Always clean up resources in compensation logic

State Management:

  • Keep state minimal and focused
  • Use descriptive state keys
  • Implement state cleanup strategies
  • Version your state schema for evolution

Performance:

  • Batch operations when possible
  • Use appropriate parallelism levels
  • Monitor primitive performance and adjust configuration

Next Steps

Durable primitives provide maximum control over AGNT5’s durability engine. Learn how they integrate with other components:

  • Tasks - Build on primitives with higher-level abstractions
  • Workflows - Combine primitives in complex orchestrations
  • Agents - Add AI capabilities to primitive-based applications
  • Entities - Use durable objects for stateful patterns

Ready to build with maximum control? Check out the Quick Start Guide to get started with durable primitives.