Tasks
Durable functions that automatically recover from failures with exactly-once execution guarantees
Tasks are the fundamental building blocks of AGNT5.
They transform any Python function into a durable operation that automatically recovers from failures.
Every task provides exactly-once execution guarantees - your code completes successfully exactly once, even during infrastructure failures.
What Tasks Do
Tasks wrap your functions with durability, validation, and monitoring capabilities:
- Execute any Python function with automatic failure recovery
- Process classification tasks with LLMs and structured output validation
- Handle data transformations with built-in retry mechanisms
- Make API calls that survive network failures and service outages
- Perform complex computations with progress checkpointing
Key Features
Automatic Recovery
Tasks resume from exactly where they failed, without reprocessing completed work:
from agnt5 import task
@task
def process_large_dataset(data_path: str) -> dict:
"""Process dataset - automatically resumes on failure."""
results = []
# Each iteration is checkpointed
for file in get_files(data_path):
processed = expensive_computation(file) # Survives failures
results.append(processed)
return {"processed_count": len(results)}
State Persistence
Task progress and intermediate results are automatically saved:
@task
def multi_step_analysis(input_data: list) -> dict:
"""Multi-step analysis with persistent state."""
# Step 1: Data cleaning (state saved)
cleaned = clean_data(input_data)
# Step 2: Feature extraction (resumes from here if step 1 completed)
features = extract_features(cleaned)
# Step 3: Model training (resumes from here if steps 1-2 completed)
model = train_model(features)
return {"accuracy": model.score}
Result Validation
Define success criteria and output formats with automatic validation:
from agnt5 import task, pydantic_task, OutputFormat
@task(output_format=OutputFormat.JSON)
def extract_customer_info(text: str) -> dict:
"""Extract structured customer data from text."""
# Task automatically validates JSON output format
return {
"name": extract_name(text),
"email": extract_email(text),
"phone": extract_phone(text)
}
# Using Pydantic models for strict validation
@pydantic_task(model_class=CustomerData)
def extract_validated_customer(text: str) -> CustomerData:
"""Extract customer data with Pydantic validation."""
return CustomerData(
name=extract_name(text),
email=extract_email(text)
)
Timeout Handling
Configure execution limits to prevent runaway tasks:
@task(timeout=300.0) # 5 minutes maximum
def time_limited_processing(data: str) -> str:
"""Processing with timeout protection."""
return expensive_operation(data)
Error Boundaries
Tasks isolate failures to prevent cascade effects:
@task(retry_count=5)
def resilient_api_call(endpoint: str) -> dict:
"""API call with automatic retries and exponential backoff."""
response = requests.get(endpoint)
response.raise_for_status()
return response.json()
Creating Tasks
Basic Task
The simplest way to create a durable function:
from agnt5 import task
@task
def calculate_score(data: dict) -> float:
"""Calculate score from input data."""
return sum(data.values()) / len(data)
# Execute the task
result = await calculate_score({"a": 10, "b": 20, "c": 30})
print(result) # 20.0
Task with Configuration
Configure retry behavior, timeouts, and output validation:
@task(
name="data_processor",
description="Process incoming data with validation",
output_format=OutputFormat.JSON,
retry_count=3,
timeout=60.0
)
def process_data(input_data: str) -> dict:
"""Process data with comprehensive error handling."""
processed = complex_transformation(input_data)
return {"result": processed, "timestamp": datetime.now().isoformat()}
JSON Extraction Task
Extract structured data with automatic JSON validation:
@json_extraction_task(schema={
"name": "string",
"age": "number",
"skills": "array"
})
def extract_profile(resume_text: str) -> dict:
"""Extract structured profile from resume text."""
# LLM integration for extraction
return llm_extract(resume_text, schema)
Pydantic Task
Use Pydantic models for strict type validation:
from pydantic import BaseModel
class UserProfile(BaseModel):
name: str
email: str
age: int
skills: list[str]
@pydantic_task(model_class=UserProfile)
def extract_user_profile(text: str) -> UserProfile:
"""Extract user profile with Pydantic validation."""
extracted = llm_extract(text)
return UserProfile(**extracted)
Running Tasks
Direct Execution
Run tasks directly and get the result:
# Simple execution
result = await calculate_score({"x": 100, "y": 200})
# With error handling
try:
result = await process_data("input text")
print(f"Success: {result}")
except Exception as e:
print(f"Task failed: {e}")
Task Result Metadata
Get detailed execution information:
# Run with metadata
task_result = await calculate_score.run({"x": 100, "y": 200})
print(f"Result: {task_result.result}")
print(f"Duration: {task_result.duration_ms}ms")
print(f"Success: {task_result.success}")
print(f"Attempt: {task_result.attempt_count}")
Async Task Execution
Tasks are fully async-compatible:
import asyncio
async def process_batch(data_list: list):
"""Process multiple items concurrently."""
tasks = [process_data(item) for item in data_list]
results = await asyncio.gather(*tasks)
return results
Integration with Workflows
Tasks work seamlessly within workflows for complex orchestration:
from agnt5 import workflow, task
@task
def validate_input(data: dict) -> dict:
"""Validate and clean input data."""
return cleaned_data
@task
def process_step(data: dict) -> dict:
"""Process validated data."""
return processed_result
@workflow
async def data_pipeline(raw_data: dict) -> dict:
"""Complete data processing pipeline."""
# Each step is durable and resumable
validated = await validate_input(raw_data)
processed = await process_step(validated)
return processed
Integration with Agents
Tasks can be enhanced with AI capabilities through agents:
from agnt5 import task, Agent
@task
def ai_classification(text: str) -> str:
"""Classify text using AI agent."""
agent = Agent(
name="classifier",
model="gpt-4o",
system_prompt="You classify text into categories."
)
response = agent.run(f"Classify this text: {text}")
return response
Advanced Features
Custom Durability Context
Access the durable execution context for advanced control:
@task(enable_durability=True)
async def advanced_task(ctx, data: str) -> dict:
"""Task with direct durability context access."""
# Save intermediate state
await ctx.state.set("step1_complete", True)
# Make durable external calls
result = await ctx.call("external_service", "process", data)
# Durable sleep
await ctx.sleep(30.0)
return {"result": result}
Task Composition
Combine tasks for complex operations:
@task
def step1(data: str) -> str:
return f"processed_{data}"
@task
def step2(data: str) -> str:
return f"transformed_{data}"
@task
async def composed_task(input_data: str) -> str:
"""Compose multiple tasks."""
result1 = await step1(input_data)
result2 = await step2(result1)
return result2
Error Handling Strategies
Implement custom error handling and recovery:
@task(retry_count=3)
def robust_processing(data: str) -> dict:
"""Processing with custom error handling."""
try:
return primary_processing(data)
except SpecificError:
# Fallback processing
return fallback_processing(data)
except Exception as e:
# Log and re-raise for retry
log_error(e)
raise
Common Patterns
Data Processing Pipeline
Transform data through multiple stages:
@task
def extract_data(source: str) -> list:
"""Extract data from source."""
return load_from_source(source)
@task
def transform_data(data: list) -> list:
"""Transform extracted data."""
return [transform_item(item) for item in data]
@task
def load_data(data: list, destination: str) -> dict:
"""Load transformed data to destination."""
save_to_destination(data, destination)
return {"loaded_count": len(data)}
External API Integration
Make resilient API calls:
@task(retry_count=5, timeout=30.0)
def fetch_user_data(user_id: str) -> dict:
"""Fetch user data from external API."""
response = requests.get(f"https://api.example.com/users/{user_id}")
response.raise_for_status()
return response.json()
@task
def enrich_user_profile(user_id: str) -> dict:
"""Enrich user profile with multiple data sources."""
base_data = await fetch_user_data(user_id)
preferences = await fetch_user_preferences(user_id)
activity = await fetch_user_activity(user_id)
return {
**base_data,
"preferences": preferences,
"recent_activity": activity
}
AI-Powered Processing
Combine tasks with AI for intelligent processing:
@task
def analyze_sentiment(text: str) -> dict:
"""Analyze sentiment using AI."""
agent = Agent(name="sentiment-analyzer", model="gpt-4o")
response = agent.run(
f"Analyze the sentiment of this text and return a JSON object "
f"with 'sentiment' (positive/negative/neutral) and 'confidence' (0-1): {text}"
)
return json.loads(response)
@task
async def process_feedback_batch(feedback_list: list) -> dict:
"""Process batch of customer feedback."""
results = []
for feedback in feedback_list:
sentiment = await analyze_sentiment(feedback["text"])
results.append({
"feedback_id": feedback["id"],
"sentiment": sentiment["sentiment"],
"confidence": sentiment["confidence"]
})
return {"processed": len(results), "results": results}
Performance Considerations
Optimize Task Granularity
Balance durability overhead with operation size:
# Too fine-grained (high overhead)
@task
def add_numbers(a: int, b: int) -> int:
return a + b
# Well-balanced (good durability/performance ratio)
@task
def process_user_batch(user_ids: list) -> dict:
"""Process batch of users efficiently."""
results = []
for user_id in user_ids:
result = process_single_user(user_id) # Non-durable helper
results.append(result)
return {"processed": len(results)}
Memory Management
Handle large datasets efficiently:
@task
def process_large_file(file_path: str) -> dict:
"""Process large file in chunks."""
total_processed = 0
# Process in chunks to manage memory
for chunk in read_file_chunks(file_path, chunk_size=1000):
processed_chunk = process_chunk(chunk)
total_processed += len(processed_chunk)
# Optional: Save progress periodically
if total_processed % 10000 == 0:
save_progress(total_processed)
return {"total_processed": total_processed}
Best Practices
Task Design:
- Each task should have one clear purpose
- Same input should produce same output for reliability
- Balance durability benefits with performance overhead
- Use type hints and descriptive docstrings
Error Handling:
- Catch and handle specific error types appropriately
- Configure retry counts based on operation characteristics
- Implement fallback strategies when possible
- Log errors and metrics for operational visibility
State Management:
- Avoid modifying input parameters
- Ensure task state is consistently managed
- Save intermediate results for long-running operations
- Properly clean up resources in error scenarios
Next Steps
Tasks provide the foundation for building resilient applications. Explore how they integrate with other AGNT5 components:
- Workflows - Orchestrate multiple tasks with complex logic
- Agents - Add AI capabilities to your tasks
- Tools - Extend task functionality with external integrations
- Entities - Maintain stateful context across task executions
Ready to build your first durable task? Check out the Quick Start Guide to get started in 15 minutes.