Tasks

Durable functions that automatically recover from failures with exactly-once execution guarantees


Tasks are the fundamental building blocks of AGNT5.

They transform any Python function into a durable operation that automatically recovers from failures.

Every task provides exactly-once execution guarantees - your code completes successfully exactly once, even during infrastructure failures.

What Tasks Do

Tasks wrap your functions with durability, validation, and monitoring capabilities:

  • Execute any Python function with automatic failure recovery
  • Process classification tasks with LLMs and structured output validation
  • Handle data transformations with built-in retry mechanisms
  • Make API calls that survive network failures and service outages
  • Perform complex computations with progress checkpointing

Key Features

Automatic Recovery

Tasks resume from exactly where they failed, without reprocessing completed work:

from agnt5 import task

@task
def process_large_dataset(data_path: str) -> dict:
    """Process dataset - automatically resumes on failure."""
    results = []
    
    # Each iteration is checkpointed
    for file in get_files(data_path):
        processed = expensive_computation(file)  # Survives failures
        results.append(processed)
    
    return {"processed_count": len(results)}

State Persistence

Task progress and intermediate results are automatically saved:

@task
def multi_step_analysis(input_data: list) -> dict:
    """Multi-step analysis with persistent state."""
    # Step 1: Data cleaning (state saved)
    cleaned = clean_data(input_data)
    
    # Step 2: Feature extraction (resumes from here if step 1 completed)  
    features = extract_features(cleaned)
    
    # Step 3: Model training (resumes from here if steps 1-2 completed)
    model = train_model(features)
    
    return {"accuracy": model.score}

Result Validation

Define success criteria and output formats with automatic validation:

from agnt5 import task, pydantic_task, OutputFormat

@task(output_format=OutputFormat.JSON)
def extract_customer_info(text: str) -> dict:
    """Extract structured customer data from text."""
    # Task automatically validates JSON output format
    return {
        "name": extract_name(text),
        "email": extract_email(text), 
        "phone": extract_phone(text)
    }

# Using Pydantic models for strict validation
@pydantic_task(model_class=CustomerData)
def extract_validated_customer(text: str) -> CustomerData:
    """Extract customer data with Pydantic validation."""
    return CustomerData(
        name=extract_name(text),
        email=extract_email(text)
    )

Timeout Handling

Configure execution limits to prevent runaway tasks:

@task(timeout=300.0)  # 5 minutes maximum
def time_limited_processing(data: str) -> str:
    """Processing with timeout protection."""
    return expensive_operation(data)

Error Boundaries

Tasks isolate failures to prevent cascade effects:

@task(retry_count=5)
def resilient_api_call(endpoint: str) -> dict:
    """API call with automatic retries and exponential backoff."""
    response = requests.get(endpoint)
    response.raise_for_status()
    return response.json()

Creating Tasks

Basic Task

The simplest way to create a durable function:

from agnt5 import task

@task
def calculate_score(data: dict) -> float:
    """Calculate score from input data."""
    return sum(data.values()) / len(data)

# Execute the task
result = await calculate_score({"a": 10, "b": 20, "c": 30})
print(result)  # 20.0

Task with Configuration

Configure retry behavior, timeouts, and output validation:

@task(
    name="data_processor",
    description="Process incoming data with validation",
    output_format=OutputFormat.JSON,
    retry_count=3,
    timeout=60.0
)
def process_data(input_data: str) -> dict:
    """Process data with comprehensive error handling."""
    processed = complex_transformation(input_data)
    return {"result": processed, "timestamp": datetime.now().isoformat()}

JSON Extraction Task

Extract structured data with automatic JSON validation:

@json_extraction_task(schema={
    "name": "string",
    "age": "number", 
    "skills": "array"
})
def extract_profile(resume_text: str) -> dict:
    """Extract structured profile from resume text."""
    # LLM integration for extraction
    return llm_extract(resume_text, schema)

Pydantic Task

Use Pydantic models for strict type validation:

from pydantic import BaseModel

class UserProfile(BaseModel):
    name: str
    email: str
    age: int
    skills: list[str]

@pydantic_task(model_class=UserProfile)
def extract_user_profile(text: str) -> UserProfile:
    """Extract user profile with Pydantic validation."""
    extracted = llm_extract(text)
    return UserProfile(**extracted)

Running Tasks

Direct Execution

Run tasks directly and get the result:

# Simple execution
result = await calculate_score({"x": 100, "y": 200})

# With error handling
try:
    result = await process_data("input text")
    print(f"Success: {result}")
except Exception as e:
    print(f"Task failed: {e}")

Task Result Metadata

Get detailed execution information:

# Run with metadata
task_result = await calculate_score.run({"x": 100, "y": 200})

print(f"Result: {task_result.result}")
print(f"Duration: {task_result.duration_ms}ms")
print(f"Success: {task_result.success}")
print(f"Attempt: {task_result.attempt_count}")

Async Task Execution

Tasks are fully async-compatible:

import asyncio

async def process_batch(data_list: list):
    """Process multiple items concurrently."""
    tasks = [process_data(item) for item in data_list]
    results = await asyncio.gather(*tasks)
    return results

Integration with Workflows

Tasks work seamlessly within workflows for complex orchestration:

from agnt5 import workflow, task

@task
def validate_input(data: dict) -> dict:
    """Validate and clean input data."""
    return cleaned_data

@task  
def process_step(data: dict) -> dict:
    """Process validated data."""
    return processed_result

@workflow
async def data_pipeline(raw_data: dict) -> dict:
    """Complete data processing pipeline."""
    # Each step is durable and resumable
    validated = await validate_input(raw_data)
    processed = await process_step(validated)
    return processed

Integration with Agents

Tasks can be enhanced with AI capabilities through agents:

from agnt5 import task, Agent

@task
def ai_classification(text: str) -> str:
    """Classify text using AI agent."""
    agent = Agent(
        name="classifier",
        model="gpt-4o",
        system_prompt="You classify text into categories."
    )
    
    response = agent.run(f"Classify this text: {text}")
    return response

Advanced Features

Custom Durability Context

Access the durable execution context for advanced control:

@task(enable_durability=True)
async def advanced_task(ctx, data: str) -> dict:
    """Task with direct durability context access."""
    # Save intermediate state
    await ctx.state.set("step1_complete", True)
    
    # Make durable external calls
    result = await ctx.call("external_service", "process", data)
    
    # Durable sleep
    await ctx.sleep(30.0)
    
    return {"result": result}

Task Composition

Combine tasks for complex operations:

@task
def step1(data: str) -> str:
    return f"processed_{data}"

@task  
def step2(data: str) -> str:
    return f"transformed_{data}"

@task
async def composed_task(input_data: str) -> str:
    """Compose multiple tasks."""
    result1 = await step1(input_data)
    result2 = await step2(result1)
    return result2

Error Handling Strategies

Implement custom error handling and recovery:

@task(retry_count=3)
def robust_processing(data: str) -> dict:
    """Processing with custom error handling."""
    try:
        return primary_processing(data)
    except SpecificError:
        # Fallback processing
        return fallback_processing(data)
    except Exception as e:
        # Log and re-raise for retry
        log_error(e)
        raise

Common Patterns

Data Processing Pipeline

Transform data through multiple stages:

@task
def extract_data(source: str) -> list:
    """Extract data from source."""
    return load_from_source(source)

@task
def transform_data(data: list) -> list:
    """Transform extracted data.""" 
    return [transform_item(item) for item in data]

@task
def load_data(data: list, destination: str) -> dict:
    """Load transformed data to destination."""
    save_to_destination(data, destination)
    return {"loaded_count": len(data)}

External API Integration

Make resilient API calls:

@task(retry_count=5, timeout=30.0)
def fetch_user_data(user_id: str) -> dict:
    """Fetch user data from external API."""
    response = requests.get(f"https://api.example.com/users/{user_id}")
    response.raise_for_status()
    return response.json()

@task
def enrich_user_profile(user_id: str) -> dict:
    """Enrich user profile with multiple data sources."""
    base_data = await fetch_user_data(user_id)
    preferences = await fetch_user_preferences(user_id)
    activity = await fetch_user_activity(user_id)
    
    return {
        **base_data,
        "preferences": preferences,
        "recent_activity": activity
    }

AI-Powered Processing

Combine tasks with AI for intelligent processing:

@task
def analyze_sentiment(text: str) -> dict:
    """Analyze sentiment using AI."""
    agent = Agent(name="sentiment-analyzer", model="gpt-4o")
    
    response = agent.run(
        f"Analyze the sentiment of this text and return a JSON object "
        f"with 'sentiment' (positive/negative/neutral) and 'confidence' (0-1): {text}"
    )
    
    return json.loads(response)

@task
async def process_feedback_batch(feedback_list: list) -> dict:
    """Process batch of customer feedback."""
    results = []
    
    for feedback in feedback_list:
        sentiment = await analyze_sentiment(feedback["text"])
        results.append({
            "feedback_id": feedback["id"],
            "sentiment": sentiment["sentiment"],
            "confidence": sentiment["confidence"]
        })
    
    return {"processed": len(results), "results": results}

Performance Considerations

Optimize Task Granularity

Balance durability overhead with operation size:

# Too fine-grained (high overhead)
@task
def add_numbers(a: int, b: int) -> int:
    return a + b

# Well-balanced (good durability/performance ratio)
@task
def process_user_batch(user_ids: list) -> dict:
    """Process batch of users efficiently."""
    results = []
    for user_id in user_ids:
        result = process_single_user(user_id)  # Non-durable helper
        results.append(result)
    return {"processed": len(results)}

Memory Management

Handle large datasets efficiently:

@task
def process_large_file(file_path: str) -> dict:
    """Process large file in chunks."""
    total_processed = 0
    
    # Process in chunks to manage memory
    for chunk in read_file_chunks(file_path, chunk_size=1000):
        processed_chunk = process_chunk(chunk)
        total_processed += len(processed_chunk)
        
        # Optional: Save progress periodically
        if total_processed % 10000 == 0:
            save_progress(total_processed)
    
    return {"total_processed": total_processed}

Best Practices

Task Design:

  • Each task should have one clear purpose
  • Same input should produce same output for reliability
  • Balance durability benefits with performance overhead
  • Use type hints and descriptive docstrings

Error Handling:

  • Catch and handle specific error types appropriately
  • Configure retry counts based on operation characteristics
  • Implement fallback strategies when possible
  • Log errors and metrics for operational visibility

State Management:

  • Avoid modifying input parameters
  • Ensure task state is consistently managed
  • Save intermediate results for long-running operations
  • Properly clean up resources in error scenarios

Next Steps

Tasks provide the foundation for building resilient applications. Explore how they integrate with other AGNT5 components:

  • Workflows - Orchestrate multiple tasks with complex logic
  • Agents - Add AI capabilities to your tasks
  • Tools - Extend task functionality with external integrations
  • Entities - Maintain stateful context across task executions

Ready to build your first durable task? Check out the Quick Start Guide to get started in 15 minutes.