Workflows
Orchestrate complex multi-step operations with durability guarantees, parallel execution, and state management
Workflows orchestrate multiple tasks and provide coordination for complex operations.
They maintain shared context, handle dependencies between tasks, and ensure reliable execution of multi-step processes.
Every workflow provides durability guarantees - your entire process completes successfully even if individual steps fail or infrastructure restarts.
What Workflows Enable
Workflows coordinate complex operations across multiple tasks:
- Chain tasks with conditional logic and dependencies
- Run tasks in parallel or sequential patterns for optimal performance
- Share state and data between multiple tasks with persistent context
- Handle complex business processes with multiple coordinated steps
- React to external events and triggers with event-driven patterns
Key Features
Task Orchestration
Coordinate execution order and dependencies across multiple operations:
from agnt5 import workflow, task
@task
def validate_order(order_data: dict) -> dict:
"""Validate order data and return cleaned version."""
return clean_and_validate(order_data)
@task
def check_inventory(items: list) -> dict:
"""Check inventory availability for items."""
return {"available": True, "reserved_items": items}
@task
def process_payment(payment_info: dict) -> dict:
"""Process payment and return transaction ID."""
return {"transaction_id": "txn_123", "status": "completed"}
@workflow
async def order_fulfillment(order_data: dict) -> dict:
"""Complete order fulfillment workflow."""
# Step 1: Validate order (runs first)
validated_order = await validate_order(order_data)
# Step 2: Check inventory and process payment (run in parallel)
inventory_result = await check_inventory(validated_order["items"])
payment_result = await process_payment(validated_order["payment"])
# Step 3: Finalize order (runs after previous steps complete)
return {
"order_id": validated_order["id"],
"inventory": inventory_result,
"payment": payment_result,
"status": "fulfilled"
}
Shared Context
Maintain consistent state across all tasks within the workflow:
@workflow
async def data_processing_pipeline(input_data: dict) -> dict:
"""Process data through multiple stages with shared context."""
context = {
"start_time": datetime.now(),
"batch_id": generate_batch_id(),
"processed_count": 0
}
# Each task can access and modify shared context
stage1_result = await process_stage1(input_data, context)
stage2_result = await process_stage2(stage1_result, context)
final_result = await process_stage3(stage2_result, context)
return {
"result": final_result,
"batch_id": context["batch_id"],
"duration": datetime.now() - context["start_time"],
"processed_count": context["processed_count"]
}
Failure Isolation
Continue execution when individual tasks fail with proper error boundaries:
@workflow
async def resilient_data_sync(data_sources: list) -> dict:
"""Sync data from multiple sources with failure isolation."""
results = {"successful": [], "failed": []}
for source in data_sources:
try:
# Each source sync is isolated
sync_result = await sync_data_source(source)
results["successful"].append({
"source": source,
"result": sync_result
})
except Exception as e:
# Failure in one source doesn't stop others
results["failed"].append({
"source": source,
"error": str(e)
})
return results
Event-Driven Patterns
React to external triggers and webhooks:
@workflow
async def webhook_processor(event_data: dict) -> dict:
"""Process webhook events with conditional logic."""
event_type = event_data.get("type")
if event_type == "user.created":
return await handle_user_creation(event_data)
elif event_type == "payment.completed":
return await handle_payment_completion(event_data)
elif event_type == "order.cancelled":
return await handle_order_cancellation(event_data)
else:
# Handle unknown event types
return await log_unknown_event(event_data)
Hierarchical Structure
Nest workflows for complex operations:
@workflow
async def customer_onboarding(customer_data: dict) -> dict:
"""Complete customer onboarding process."""
# Sub-workflow for account setup
account_result = await account_setup_flow(customer_data)
# Sub-workflow for verification
verification_result = await verification_flow(account_result)
# Sub-workflow for welcome sequence
welcome_result = await welcome_sequence_flow(verification_result)
return {
"customer_id": account_result["customer_id"],
"onboarding_complete": True,
"verification_status": verification_result["status"],
"welcome_sent": welcome_result["sent"]
}
Creating Workflows
Decorator Approach
The simplest way to create a workflow:
from agnt5 import workflow
@workflow
async def simple_workflow(input_data: str) -> dict:
"""Simple workflow with sequential task execution."""
step1_result = await task1(input_data)
step2_result = await task2(step1_result)
step3_result = await task3(step2_result)
return {"final_result": step3_result}
Workflow with Configuration
Configure durability, timeouts, and retry behavior:
@workflow(
name="complex_workflow",
description="Complex multi-step process with configuration",
enable_durability=True,
timeout=1800.0 # 30 minutes
)
async def complex_workflow(input_data: dict) -> dict:
"""Configured workflow with custom settings."""
return await process_complex_operation(input_data)
Class-Based Workflows
Use class-based workflows for complex orchestration with advanced features:
from agnt5 import Workflow
class DataProcessingWorkflow(Workflow):
"""Advanced workflow with class-based implementation."""
def __init__(self):
super().__init__(name="data_processing_pipeline")
self.processed_count = 0
async def run(self, data: list[str]) -> dict:
"""Execute the complete data processing workflow."""
# Step 1: Validate all input data
validated = await self.step("validate", self.validate_data, data)
# Step 2: Process data in parallel batches
batch_results = await self.parallel([
("analyze_batch", self.analyze_data, batch)
for batch in chunk_data(validated, batch_size=100)
])
# Step 3: Combine results
final_result = await self.step("combine", self.combine_results, batch_results)
return {
"processed_items": len(data),
"batch_count": len(batch_results),
"result": final_result
}
async def validate_data(self, data: list[str]) -> list[str]:
"""Validate input data."""
return [item for item in data if is_valid(item)]
async def analyze_data(self, batch: list[str]) -> dict:
"""Analyze a batch of data."""
analysis_result = perform_analysis(batch)
self.processed_count += len(batch)
return analysis_result
async def combine_results(self, batch_results: list[dict]) -> dict:
"""Combine results from all batches."""
return aggregate_analysis_results(batch_results)
Advanced Workflow Methods
Class-based workflows provide advanced orchestration methods:
class AdvancedWorkflow(Workflow):
async def run(self, input_data: dict) -> dict:
# Execute a named step with automatic checkpointing
result1 = await self.step("process_data", self.process_step, input_data)
# Execute multiple tasks in parallel
parallel_results = await self.parallel([
("task_a", self.task_a, result1),
("task_b", self.task_b, result1),
("task_c", self.task_c, result1),
])
# Call an agent within the workflow
agent_response = await self.call_agent(
"analysis_agent",
f"Analyze these results: {parallel_results}"
)
# Call a tool within the workflow
tool_result = await self.call_tool(
"notification_tool",
message=f"Workflow complete: {agent_response}"
)
return {
"results": parallel_results,
"analysis": agent_response,
"notification": tool_result
}
Parallel Execution Patterns
Batch Processing
Process multiple items concurrently:
@workflow
async def batch_processor(items: list) -> dict:
"""Process items in parallel batches."""
batch_size = 10
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
# Process all batches in parallel
batch_results = await asyncio.gather(*[
process_batch(batch) for batch in batches
])
return {
"total_items": len(items),
"batch_count": len(batches),
"results": batch_results
}
Fan-Out/Fan-In Pattern
Distribute work across multiple tasks and collect results:
@workflow
async def fan_out_fan_in(input_data: dict) -> dict:
"""Fan-out work to multiple processors, then fan-in results."""
# Fan-out: distribute work
parallel_tasks = [
processor_a(input_data),
processor_b(input_data),
processor_c(input_data)
]
# Execute in parallel
results = await asyncio.gather(*parallel_tasks)
# Fan-in: combine results
combined_result = await combine_processor_results(results)
return {"combined": combined_result}
Conditional Parallel Execution
Run tasks conditionally based on runtime decisions:
@workflow
async def conditional_workflow(config: dict) -> dict:
"""Execute tasks conditionally based on configuration."""
results = {}
# Always run core processing
core_result = await core_processing(config["input"])
results["core"] = core_result
# Conditional parallel execution
optional_tasks = []
if config.get("enable_analytics"):
optional_tasks.append(("analytics", run_analytics, core_result))
if config.get("enable_notifications"):
optional_tasks.append(("notifications", send_notifications, core_result))
if config.get("enable_backup"):
optional_tasks.append(("backup", create_backup, core_result))
# Run optional tasks in parallel if any are enabled
if optional_tasks:
optional_results = await asyncio.gather(*[
task_func(task_input) for _, task_func, task_input in optional_tasks
])
for i, (task_name, _, _) in enumerate(optional_tasks):
results[task_name] = optional_results[i]
return results
Integration with Agents
Workflows can orchestrate multiple agents for complex AI operations:
from agnt5 import workflow, Agent
@workflow
async def content_creation_pipeline(topic: str) -> dict:
"""Create content using multiple specialized agents."""
# Research agent gathers information
researcher = Agent(
name="researcher",
model="gpt-4o",
system_prompt="You research topics and provide comprehensive information."
)
research = await researcher.run(f"Research information about: {topic}")
# Writer agent creates content
writer = Agent(
name="writer",
model="gpt-4o",
system_prompt="You write engaging articles based on research."
)
article = await writer.run(f"Write an article about {topic} using this research: {research}")
# Editor agent reviews and improves content
editor = Agent(
name="editor",
model="gpt-4o",
system_prompt="You edit and improve articles for clarity and engagement."
)
final_article = await editor.run(f"Edit and improve this article: {article}")
return {
"topic": topic,
"research": research,
"draft": article,
"final_article": final_article
}
Multi-Agent Coordination
Coordinate multiple agents with shared context:
class MultiAgentWorkflow(Workflow):
def __init__(self):
super().__init__("multi_agent_coordination")
# Initialize specialized agents
self.analyst = Agent(name="analyst", model="gpt-4o")
self.strategist = Agent(name="strategist", model="gpt-4o")
self.implementer = Agent(name="implementer", model="gpt-4o")
async def run(self, business_challenge: str) -> dict:
"""Coordinate multiple agents to solve business challenge."""
# Step 1: Analysis
analysis = await self.call_agent(
"analyst",
f"Analyze this business challenge: {business_challenge}"
)
# Step 2: Strategy development
strategy = await self.call_agent(
"strategist",
f"Develop strategy based on this analysis: {analysis}"
)
# Step 3: Implementation planning
implementation = await self.call_agent(
"implementer",
f"Create implementation plan for this strategy: {strategy}"
)
return {
"challenge": business_challenge,
"analysis": analysis,
"strategy": strategy,
"implementation_plan": implementation
}
Integration with Tools
Workflows can orchestrate tool usage for external system integration:
from agnt5 import workflow, tool
@tool
def send_email(recipient: str, subject: str, body: str) -> dict:
"""Send email notification."""
return {"sent": True, "recipient": recipient}
@tool
def update_database(record_id: str, data: dict) -> dict:
"""Update database record."""
return {"updated": True, "record_id": record_id}
@tool
def generate_report(data: dict) -> str:
"""Generate formatted report."""
return f"Report generated for {len(data)} items"
@workflow
async def order_completion_workflow(order_id: str) -> dict:
"""Complete order with external system integration."""
# Get order details
order = await get_order_details(order_id)
# Update order status in database
db_result = await update_database(order_id, {"status": "completed"})
# Generate completion report
report = await generate_report(order)
# Send notification email
email_result = await send_email(
recipient=order["customer_email"],
subject=f"Order {order_id} Complete",
body=f"Your order has been completed. {report}"
)
return {
"order_id": order_id,
"database_updated": db_result["updated"],
"report_generated": len(report) > 0,
"notification_sent": email_result["sent"]
}
Error Handling and Recovery
Retry Strategies
Implement custom retry logic for workflow steps:
@workflow
async def resilient_workflow(data: dict) -> dict:
"""Workflow with custom retry strategies."""
max_retries = 3
for attempt in range(max_retries):
try:
# Critical step that might fail
result = await critical_processing_step(data)
break
except TransientError as e:
if attempt == max_retries - 1:
# Last attempt failed, try fallback
result = await fallback_processing(data)
else:
# Wait before retry with exponential backoff
await asyncio.sleep(2 ** attempt)
continue
except PermanentError:
# Permanent error, use fallback immediately
result = await fallback_processing(data)
break
return {"result": result}
Partial Success Handling
Handle scenarios where some steps succeed and others fail:
@workflow
async def partial_success_workflow(items: list) -> dict:
"""Handle partial success gracefully."""
successful = []
failed = []
for item in items:
try:
result = await process_item(item)
successful.append({"item": item, "result": result})
except Exception as e:
failed.append({"item": item, "error": str(e)})
# Continue with successful items
if successful:
summary = await generate_summary([s["result"] for s in successful])
else:
summary = "No items processed successfully"
return {
"successful_count": len(successful),
"failed_count": len(failed),
"successful_items": successful,
"failed_items": failed,
"summary": summary
}
Compensation Patterns
Implement compensation logic for rollback scenarios:
@workflow
async def transactional_workflow(operation_data: dict) -> dict:
"""Workflow with compensation pattern for rollback."""
completed_steps = []
try:
# Step 1: Reserve resources
reservation = await reserve_resources(operation_data)
completed_steps.append(("reserve", reservation))
# Step 2: Process payment
payment = await process_payment(operation_data)
completed_steps.append(("payment", payment))
# Step 3: Fulfill order
fulfillment = await fulfill_order(operation_data)
completed_steps.append(("fulfillment", fulfillment))
return {"success": True, "steps": completed_steps}
except Exception as e:
# Compensate completed steps in reverse order
for step_name, step_result in reversed(completed_steps):
try:
if step_name == "fulfillment":
await cancel_fulfillment(step_result)
elif step_name == "payment":
await refund_payment(step_result)
elif step_name == "reserve":
await release_resources(step_result)
except Exception as comp_error:
# Log compensation failures but continue
log_compensation_error(step_name, comp_error)
return {"success": False, "error": str(e), "compensated": True}
Common Patterns
ETL (Extract, Transform, Load) Pipeline
Implement data pipeline workflows:
@workflow
async def etl_pipeline(source_config: dict) -> dict:
"""Extract, Transform, Load data pipeline."""
# Extract phase
raw_data = await extract_data(source_config)
# Transform phase - process in parallel batches
batch_size = 1000
batches = chunk_data(raw_data, batch_size)
transformed_batches = await asyncio.gather(*[
transform_batch(batch) for batch in batches
])
# Flatten results
transformed_data = [item for batch in transformed_batches for item in batch]
# Load phase
load_result = await load_data(transformed_data, source_config["destination"])
return {
"extracted_count": len(raw_data),
"transformed_count": len(transformed_data),
"loaded_count": load_result["count"],
"pipeline_success": True
}
Approval Workflow
Implement approval processes with human-in-the-loop:
@workflow
async def approval_workflow(request_data: dict) -> dict:
"""Workflow with approval steps."""
# Step 1: Validate request
validation = await validate_request(request_data)
if not validation["valid"]:
return {"approved": False, "reason": validation["errors"]}
# Step 2: Auto-approval for small amounts
if request_data["amount"] < 1000:
approval = await auto_approve(request_data)
else:
# Step 3: Human approval for larger amounts
approval = await request_human_approval(request_data)
# Wait for approval (with timeout)
approval_result = await wait_for_approval(
approval["approval_id"],
timeout=86400 # 24 hours
)
approval = approval_result
# Step 4: Process approved request
if approval["approved"]:
processing_result = await process_approved_request(request_data)
return {
"approved": True,
"approval_id": approval["approval_id"],
"processed": processing_result["success"]
}
else:
return {
"approved": False,
"reason": approval["reason"]
}
Event Processing Workflow
Process events with conditional logic:
@workflow
async def event_processing_workflow(event: dict) -> dict:
"""Process events based on type and content."""
event_type = event.get("type")
event_data = event.get("data", {})
# Common validation for all events
validation = await validate_event_format(event)
if not validation["valid"]:
return {"processed": False, "error": validation["error"]}
# Type-specific processing
if event_type == "user_action":
result = await process_user_action(event_data)
elif event_type == "system_alert":
result = await process_system_alert(event_data)
elif event_type == "business_event":
result = await process_business_event(event_data)
else:
result = await process_unknown_event(event)
# Common post-processing
await log_event_processing(event, result)
await update_metrics(event_type, result["success"])
return {
"event_id": event.get("id"),
"event_type": event_type,
"processed": result["success"],
"result": result
}
Performance Optimization
Workflow Batching
Process multiple similar workflows efficiently:
@workflow
async def batch_workflow_processor(workflow_requests: list) -> list:
"""Process multiple similar workflows efficiently."""
# Group requests by type for batch processing
grouped_requests = group_by_type(workflow_requests)
results = []
for request_type, requests in grouped_requests.items():
if request_type == "data_processing":
# Process data requests in parallel
batch_results = await asyncio.gather(*[
data_processing_workflow(req) for req in requests
])
elif request_type == "notification":
# Batch notifications for efficiency
batch_results = await batch_notification_workflow(requests)
else:
# Process other types individually
batch_results = await asyncio.gather(*[
generic_workflow(req) for req in requests
])
results.extend(batch_results)
return results
Resource Management
Manage resources efficiently across workflow steps:
@workflow
async def resource_managed_workflow(data: dict) -> dict:
"""Workflow with efficient resource management."""
# Acquire shared resources at workflow level
async with acquire_database_connection() as db:
async with acquire_api_client() as api_client:
# Step 1: Data retrieval (reuse connections)
raw_data = await fetch_data_from_db(db, data["query"])
# Step 2: External enrichment (reuse API client)
enriched_data = await enrich_with_api(api_client, raw_data)
# Step 3: Process and store (reuse connections)
processed = await process_data(enriched_data)
result = await store_results(db, processed)
return {"processed_count": len(processed), "stored": result}
Best Practices
Workflow Design:
- Define clear input/output interfaces
- Ensure workflows can be safely rerun
- Balance durability with performance
- Minimize shared state between tasks
Error Handling:
- Handle partial failures appropriately
- Configure retries based on error types
- Implement rollback for transactional operations
- Include comprehensive logging and metrics
Performance:
- Use parallel patterns where possible
- Share connections and resources efficiently
- Group similar operations for efficiency
- Handle large datasets appropriately
Maintainability:
- Document workflow purpose and steps clearly
- Break complex workflows into smaller components
- Include unit and integration tests
- Implement observability for production workflows
Next Steps
Workflows provide the orchestration layer for complex operations. Learn how they integrate with other AGNT5 components:
- Tasks - Build the durable functions workflows orchestrate
- Agents - Add AI decision-making to workflow steps
- Tools - Connect workflows to external systems
- Entities - Maintain stateful context across workflow executions
Ready to orchestrate your first workflow? Check out the Quick Start Guide to get started building complex operations.