Workflows

Orchestrate complex multi-step operations with durability guarantees, parallel execution, and state management


Workflows orchestrate multiple tasks and provide coordination for complex operations.

They maintain shared context, handle dependencies between tasks, and ensure reliable execution of multi-step processes.

Every workflow provides durability guarantees - your entire process completes successfully even if individual steps fail or infrastructure restarts.

What Workflows Enable

Workflows coordinate complex operations across multiple tasks:

  • Chain tasks with conditional logic and dependencies
  • Run tasks in parallel or sequential patterns for optimal performance
  • Share state and data between multiple tasks with persistent context
  • Handle complex business processes with multiple coordinated steps
  • React to external events and triggers with event-driven patterns

Key Features

Task Orchestration

Coordinate execution order and dependencies across multiple operations:

from agnt5 import workflow, task

@task
def validate_order(order_data: dict) -> dict:
    """Validate order data and return cleaned version."""
    return clean_and_validate(order_data)

@task
def check_inventory(items: list) -> dict:
    """Check inventory availability for items.""" 
    return {"available": True, "reserved_items": items}

@task
def process_payment(payment_info: dict) -> dict:
    """Process payment and return transaction ID."""
    return {"transaction_id": "txn_123", "status": "completed"}

@workflow
async def order_fulfillment(order_data: dict) -> dict:
    """Complete order fulfillment workflow."""
    # Step 1: Validate order (runs first)
    validated_order = await validate_order(order_data)
    
    # Step 2: Check inventory and process payment (run in parallel)  
    inventory_result = await check_inventory(validated_order["items"])
    payment_result = await process_payment(validated_order["payment"])
    
    # Step 3: Finalize order (runs after previous steps complete)
    return {
        "order_id": validated_order["id"],
        "inventory": inventory_result,
        "payment": payment_result,
        "status": "fulfilled"
    }

Shared Context

Maintain consistent state across all tasks within the workflow:

@workflow
async def data_processing_pipeline(input_data: dict) -> dict:
    """Process data through multiple stages with shared context."""
    context = {
        "start_time": datetime.now(),
        "batch_id": generate_batch_id(),
        "processed_count": 0
    }
    
    # Each task can access and modify shared context
    stage1_result = await process_stage1(input_data, context)
    stage2_result = await process_stage2(stage1_result, context)
    final_result = await process_stage3(stage2_result, context)
    
    return {
        "result": final_result,
        "batch_id": context["batch_id"],
        "duration": datetime.now() - context["start_time"],
        "processed_count": context["processed_count"]
    }

Failure Isolation

Continue execution when individual tasks fail with proper error boundaries:

@workflow
async def resilient_data_sync(data_sources: list) -> dict:
    """Sync data from multiple sources with failure isolation."""
    results = {"successful": [], "failed": []}
    
    for source in data_sources:
        try:
            # Each source sync is isolated
            sync_result = await sync_data_source(source)
            results["successful"].append({
                "source": source,
                "result": sync_result
            })
        except Exception as e:
            # Failure in one source doesn't stop others
            results["failed"].append({
                "source": source,
                "error": str(e)
            })
    
    return results

Event-Driven Patterns

React to external triggers and webhooks:

@workflow
async def webhook_processor(event_data: dict) -> dict:
    """Process webhook events with conditional logic."""
    event_type = event_data.get("type")
    
    if event_type == "user.created":
        return await handle_user_creation(event_data)
    elif event_type == "payment.completed":
        return await handle_payment_completion(event_data)
    elif event_type == "order.cancelled":
        return await handle_order_cancellation(event_data)
    else:
        # Handle unknown event types
        return await log_unknown_event(event_data)

Hierarchical Structure

Nest workflows for complex operations:

@workflow
async def customer_onboarding(customer_data: dict) -> dict:
    """Complete customer onboarding process."""
    # Sub-workflow for account setup
    account_result = await account_setup_flow(customer_data)
    
    # Sub-workflow for verification
    verification_result = await verification_flow(account_result)
    
    # Sub-workflow for welcome sequence
    welcome_result = await welcome_sequence_flow(verification_result)
    
    return {
        "customer_id": account_result["customer_id"],
        "onboarding_complete": True,
        "verification_status": verification_result["status"],
        "welcome_sent": welcome_result["sent"]
    }

Creating Workflows

Decorator Approach

The simplest way to create a workflow:

from agnt5 import workflow

@workflow
async def simple_workflow(input_data: str) -> dict:
    """Simple workflow with sequential task execution."""
    step1_result = await task1(input_data)
    step2_result = await task2(step1_result)
    step3_result = await task3(step2_result)
    
    return {"final_result": step3_result}

Workflow with Configuration

Configure durability, timeouts, and retry behavior:

@workflow(
    name="complex_workflow",
    description="Complex multi-step process with configuration",
    enable_durability=True,
    timeout=1800.0  # 30 minutes
)
async def complex_workflow(input_data: dict) -> dict:
    """Configured workflow with custom settings."""
    return await process_complex_operation(input_data)

Class-Based Workflows

Use class-based workflows for complex orchestration with advanced features:

from agnt5 import Workflow

class DataProcessingWorkflow(Workflow):
    """Advanced workflow with class-based implementation."""
    
    def __init__(self):
        super().__init__(name="data_processing_pipeline")
        self.processed_count = 0
    
    async def run(self, data: list[str]) -> dict:
        """Execute the complete data processing workflow."""
        # Step 1: Validate all input data
        validated = await self.step("validate", self.validate_data, data)
        
        # Step 2: Process data in parallel batches
        batch_results = await self.parallel([
            ("analyze_batch", self.analyze_data, batch) 
            for batch in chunk_data(validated, batch_size=100)
        ])
        
        # Step 3: Combine results
        final_result = await self.step("combine", self.combine_results, batch_results)
        
        return {
            "processed_items": len(data),
            "batch_count": len(batch_results),
            "result": final_result
        }
    
    async def validate_data(self, data: list[str]) -> list[str]:
        """Validate input data."""
        return [item for item in data if is_valid(item)]
    
    async def analyze_data(self, batch: list[str]) -> dict:
        """Analyze a batch of data."""
        analysis_result = perform_analysis(batch)
        self.processed_count += len(batch)
        return analysis_result
    
    async def combine_results(self, batch_results: list[dict]) -> dict:
        """Combine results from all batches."""
        return aggregate_analysis_results(batch_results)

Advanced Workflow Methods

Class-based workflows provide advanced orchestration methods:

class AdvancedWorkflow(Workflow):
    async def run(self, input_data: dict) -> dict:
        # Execute a named step with automatic checkpointing
        result1 = await self.step("process_data", self.process_step, input_data)
        
        # Execute multiple tasks in parallel
        parallel_results = await self.parallel([
            ("task_a", self.task_a, result1),
            ("task_b", self.task_b, result1),
            ("task_c", self.task_c, result1),
        ])
        
        # Call an agent within the workflow
        agent_response = await self.call_agent(
            "analysis_agent", 
            f"Analyze these results: {parallel_results}"
        )
        
        # Call a tool within the workflow
        tool_result = await self.call_tool(
            "notification_tool",
            message=f"Workflow complete: {agent_response}"
        )
        
        return {
            "results": parallel_results,
            "analysis": agent_response,
            "notification": tool_result
        }

Parallel Execution Patterns

Batch Processing

Process multiple items concurrently:

@workflow
async def batch_processor(items: list) -> dict:
    """Process items in parallel batches."""
    batch_size = 10
    batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
    
    # Process all batches in parallel
    batch_results = await asyncio.gather(*[
        process_batch(batch) for batch in batches
    ])
    
    return {
        "total_items": len(items),
        "batch_count": len(batches),
        "results": batch_results
    }

Fan-Out/Fan-In Pattern

Distribute work across multiple tasks and collect results:

@workflow  
async def fan_out_fan_in(input_data: dict) -> dict:
    """Fan-out work to multiple processors, then fan-in results."""
    # Fan-out: distribute work
    parallel_tasks = [
        processor_a(input_data),
        processor_b(input_data), 
        processor_c(input_data)
    ]
    
    # Execute in parallel
    results = await asyncio.gather(*parallel_tasks)
    
    # Fan-in: combine results
    combined_result = await combine_processor_results(results)
    
    return {"combined": combined_result}

Conditional Parallel Execution

Run tasks conditionally based on runtime decisions:

@workflow
async def conditional_workflow(config: dict) -> dict:
    """Execute tasks conditionally based on configuration."""
    results = {}
    
    # Always run core processing
    core_result = await core_processing(config["input"])
    results["core"] = core_result
    
    # Conditional parallel execution
    optional_tasks = []
    
    if config.get("enable_analytics"):
        optional_tasks.append(("analytics", run_analytics, core_result))
    
    if config.get("enable_notifications"):  
        optional_tasks.append(("notifications", send_notifications, core_result))
    
    if config.get("enable_backup"):
        optional_tasks.append(("backup", create_backup, core_result))
    
    # Run optional tasks in parallel if any are enabled
    if optional_tasks:
        optional_results = await asyncio.gather(*[
            task_func(task_input) for _, task_func, task_input in optional_tasks
        ])
        
        for i, (task_name, _, _) in enumerate(optional_tasks):
            results[task_name] = optional_results[i]
    
    return results

Integration with Agents

Workflows can orchestrate multiple agents for complex AI operations:

from agnt5 import workflow, Agent

@workflow
async def content_creation_pipeline(topic: str) -> dict:
    """Create content using multiple specialized agents."""
    # Research agent gathers information
    researcher = Agent(
        name="researcher",
        model="gpt-4o",
        system_prompt="You research topics and provide comprehensive information."
    )
    research = await researcher.run(f"Research information about: {topic}")
    
    # Writer agent creates content
    writer = Agent(
        name="writer", 
        model="gpt-4o",
        system_prompt="You write engaging articles based on research."
    )
    article = await writer.run(f"Write an article about {topic} using this research: {research}")
    
    # Editor agent reviews and improves content
    editor = Agent(
        name="editor",
        model="gpt-4o", 
        system_prompt="You edit and improve articles for clarity and engagement."
    )
    final_article = await editor.run(f"Edit and improve this article: {article}")
    
    return {
        "topic": topic,
        "research": research,
        "draft": article,
        "final_article": final_article
    }

Multi-Agent Coordination

Coordinate multiple agents with shared context:

class MultiAgentWorkflow(Workflow):
    def __init__(self):
        super().__init__("multi_agent_coordination")
        
        # Initialize specialized agents
        self.analyst = Agent(name="analyst", model="gpt-4o")
        self.strategist = Agent(name="strategist", model="gpt-4o")
        self.implementer = Agent(name="implementer", model="gpt-4o")
    
    async def run(self, business_challenge: str) -> dict:
        """Coordinate multiple agents to solve business challenge."""
        # Step 1: Analysis
        analysis = await self.call_agent(
            "analyst",
            f"Analyze this business challenge: {business_challenge}"
        )
        
        # Step 2: Strategy development
        strategy = await self.call_agent(
            "strategist", 
            f"Develop strategy based on this analysis: {analysis}"
        )
        
        # Step 3: Implementation planning
        implementation = await self.call_agent(
            "implementer",
            f"Create implementation plan for this strategy: {strategy}"
        )
        
        return {
            "challenge": business_challenge,
            "analysis": analysis,
            "strategy": strategy,
            "implementation_plan": implementation
        }

Integration with Tools

Workflows can orchestrate tool usage for external system integration:

from agnt5 import workflow, tool

@tool
def send_email(recipient: str, subject: str, body: str) -> dict:
    """Send email notification."""
    return {"sent": True, "recipient": recipient}

@tool
def update_database(record_id: str, data: dict) -> dict:
    """Update database record."""
    return {"updated": True, "record_id": record_id}

@tool
def generate_report(data: dict) -> str:
    """Generate formatted report."""
    return f"Report generated for {len(data)} items"

@workflow
async def order_completion_workflow(order_id: str) -> dict:
    """Complete order with external system integration."""
    # Get order details
    order = await get_order_details(order_id)
    
    # Update order status in database
    db_result = await update_database(order_id, {"status": "completed"})
    
    # Generate completion report  
    report = await generate_report(order)
    
    # Send notification email
    email_result = await send_email(
        recipient=order["customer_email"],
        subject=f"Order {order_id} Complete",
        body=f"Your order has been completed. {report}"
    )
    
    return {
        "order_id": order_id,
        "database_updated": db_result["updated"],
        "report_generated": len(report) > 0,
        "notification_sent": email_result["sent"]
    }

Error Handling and Recovery

Retry Strategies

Implement custom retry logic for workflow steps:

@workflow
async def resilient_workflow(data: dict) -> dict:
    """Workflow with custom retry strategies."""
    max_retries = 3
    
    for attempt in range(max_retries):
        try:
            # Critical step that might fail
            result = await critical_processing_step(data)
            break
        except TransientError as e:
            if attempt == max_retries - 1:
                # Last attempt failed, try fallback
                result = await fallback_processing(data)
            else:
                # Wait before retry with exponential backoff
                await asyncio.sleep(2 ** attempt)
                continue
        except PermanentError:
            # Permanent error, use fallback immediately
            result = await fallback_processing(data)
            break
    
    return {"result": result}

Partial Success Handling

Handle scenarios where some steps succeed and others fail:

@workflow
async def partial_success_workflow(items: list) -> dict:
    """Handle partial success gracefully."""
    successful = []
    failed = []
    
    for item in items:
        try:
            result = await process_item(item)
            successful.append({"item": item, "result": result})
        except Exception as e:
            failed.append({"item": item, "error": str(e)})
    
    # Continue with successful items
    if successful:
        summary = await generate_summary([s["result"] for s in successful])
    else:
        summary = "No items processed successfully"
    
    return {
        "successful_count": len(successful),
        "failed_count": len(failed), 
        "successful_items": successful,
        "failed_items": failed,
        "summary": summary
    }

Compensation Patterns

Implement compensation logic for rollback scenarios:

@workflow
async def transactional_workflow(operation_data: dict) -> dict:
    """Workflow with compensation pattern for rollback."""
    completed_steps = []
    
    try:
        # Step 1: Reserve resources
        reservation = await reserve_resources(operation_data)
        completed_steps.append(("reserve", reservation))
        
        # Step 2: Process payment
        payment = await process_payment(operation_data)
        completed_steps.append(("payment", payment))
        
        # Step 3: Fulfill order
        fulfillment = await fulfill_order(operation_data)
        completed_steps.append(("fulfillment", fulfillment))
        
        return {"success": True, "steps": completed_steps}
        
    except Exception as e:
        # Compensate completed steps in reverse order
        for step_name, step_result in reversed(completed_steps):
            try:
                if step_name == "fulfillment":
                    await cancel_fulfillment(step_result)
                elif step_name == "payment":
                    await refund_payment(step_result)
                elif step_name == "reserve":
                    await release_resources(step_result)
            except Exception as comp_error:
                # Log compensation failures but continue
                log_compensation_error(step_name, comp_error)
        
        return {"success": False, "error": str(e), "compensated": True}

Common Patterns

ETL (Extract, Transform, Load) Pipeline

Implement data pipeline workflows:

@workflow
async def etl_pipeline(source_config: dict) -> dict:
    """Extract, Transform, Load data pipeline."""
    # Extract phase
    raw_data = await extract_data(source_config)
    
    # Transform phase - process in parallel batches
    batch_size = 1000
    batches = chunk_data(raw_data, batch_size)
    
    transformed_batches = await asyncio.gather(*[
        transform_batch(batch) for batch in batches
    ])
    
    # Flatten results
    transformed_data = [item for batch in transformed_batches for item in batch]
    
    # Load phase  
    load_result = await load_data(transformed_data, source_config["destination"])
    
    return {
        "extracted_count": len(raw_data),
        "transformed_count": len(transformed_data),
        "loaded_count": load_result["count"],
        "pipeline_success": True
    }

Approval Workflow

Implement approval processes with human-in-the-loop:

@workflow
async def approval_workflow(request_data: dict) -> dict:
    """Workflow with approval steps."""
    # Step 1: Validate request
    validation = await validate_request(request_data)
    if not validation["valid"]:
        return {"approved": False, "reason": validation["errors"]}
    
    # Step 2: Auto-approval for small amounts
    if request_data["amount"] < 1000:
        approval = await auto_approve(request_data)
    else:
        # Step 3: Human approval for larger amounts
        approval = await request_human_approval(request_data)
        
        # Wait for approval (with timeout)
        approval_result = await wait_for_approval(
            approval["approval_id"], 
            timeout=86400  # 24 hours
        )
        approval = approval_result
    
    # Step 4: Process approved request
    if approval["approved"]:
        processing_result = await process_approved_request(request_data)
        return {
            "approved": True,
            "approval_id": approval["approval_id"], 
            "processed": processing_result["success"]
        }
    else:
        return {
            "approved": False,
            "reason": approval["reason"]
        }

Event Processing Workflow

Process events with conditional logic:

@workflow
async def event_processing_workflow(event: dict) -> dict:
    """Process events based on type and content."""
    event_type = event.get("type")
    event_data = event.get("data", {})
    
    # Common validation for all events
    validation = await validate_event_format(event)
    if not validation["valid"]:
        return {"processed": False, "error": validation["error"]}
    
    # Type-specific processing
    if event_type == "user_action":
        result = await process_user_action(event_data)
    elif event_type == "system_alert":
        result = await process_system_alert(event_data)
    elif event_type == "business_event":
        result = await process_business_event(event_data)
    else:
        result = await process_unknown_event(event)
    
    # Common post-processing
    await log_event_processing(event, result)
    await update_metrics(event_type, result["success"])
    
    return {
        "event_id": event.get("id"),
        "event_type": event_type,
        "processed": result["success"],
        "result": result
    }

Performance Optimization

Workflow Batching

Process multiple similar workflows efficiently:

@workflow
async def batch_workflow_processor(workflow_requests: list) -> list:
    """Process multiple similar workflows efficiently."""
    # Group requests by type for batch processing
    grouped_requests = group_by_type(workflow_requests)
    
    results = []
    for request_type, requests in grouped_requests.items():
        if request_type == "data_processing":
            # Process data requests in parallel
            batch_results = await asyncio.gather(*[
                data_processing_workflow(req) for req in requests
            ])
        elif request_type == "notification":
            # Batch notifications for efficiency  
            batch_results = await batch_notification_workflow(requests)
        else:
            # Process other types individually
            batch_results = await asyncio.gather(*[
                generic_workflow(req) for req in requests
            ])
        
        results.extend(batch_results)
    
    return results

Resource Management

Manage resources efficiently across workflow steps:

@workflow
async def resource_managed_workflow(data: dict) -> dict:
    """Workflow with efficient resource management."""
    # Acquire shared resources at workflow level
    async with acquire_database_connection() as db:
        async with acquire_api_client() as api_client:
            # Step 1: Data retrieval (reuse connections)
            raw_data = await fetch_data_from_db(db, data["query"])
            
            # Step 2: External enrichment (reuse API client)
            enriched_data = await enrich_with_api(api_client, raw_data)
            
            # Step 3: Process and store (reuse connections)
            processed = await process_data(enriched_data)
            result = await store_results(db, processed)
            
            return {"processed_count": len(processed), "stored": result}

Best Practices

Workflow Design:

  • Define clear input/output interfaces
  • Ensure workflows can be safely rerun
  • Balance durability with performance
  • Minimize shared state between tasks

Error Handling:

  • Handle partial failures appropriately
  • Configure retries based on error types
  • Implement rollback for transactional operations
  • Include comprehensive logging and metrics

Performance:

  • Use parallel patterns where possible
  • Share connections and resources efficiently
  • Group similar operations for efficiency
  • Handle large datasets appropriately

Maintainability:

  • Document workflow purpose and steps clearly
  • Break complex workflows into smaller components
  • Include unit and integration tests
  • Implement observability for production workflows

Next Steps

Workflows provide the orchestration layer for complex operations. Learn how they integrate with other AGNT5 components:

  • Tasks - Build the durable functions workflows orchestrate
  • Agents - Add AI decision-making to workflow steps
  • Tools - Connect workflows to external systems
  • Entities - Maintain stateful context across workflow executions

Ready to orchestrate your first workflow? Check out the Quick Start Guide to get started building complex operations.