Tools

Extend functionality by providing access to external systems, APIs, and custom functions that bridge workflows with the real world


Tools extend functionality by providing access to external systems, APIs, and custom functions.

They bridge your workflows with the real world, enabling agents and tasks to interact with databases, APIs, file systems, and any external service.

Every tool provides automatic schema generation, validation, and error handling with built-in retry mechanisms.

What Tools Enable

Tools connect your AGNT5 applications to external systems and capabilities:

  • Connect to databases and external APIs with automatic retry logic
  • Execute custom functions with parameter validation and error handling
  • Process files, images, and media with type-safe interfaces
  • Send notifications and webhooks with delivery guarantees
  • Integrate with existing systems and infrastructure seamlessly

Key Features

Function Integration

Convert any Python function into a tool with automatic schema generation:

from agnt5 import tool

@tool
def calculate_compound_interest(principal: float, rate: float, time: int, compounds_per_year: int = 1) -> float:
    """Calculate compound interest over time.
    
    Args:
        principal: Initial investment amount
        rate: Annual interest rate (as decimal, e.g., 0.05 for 5%)
        time: Time period in years
        compounds_per_year: Number of times interest compounds per year
        
    Returns:
        Final amount after compound interest
    """
    return principal * (1 + rate / compounds_per_year) ** (compounds_per_year * time)

# Tool automatically generates schema for agents
schema = calculate_compound_interest.get_schema()
# Returns OpenAI function schema for agent integration

API Connectivity

Connect to REST APIs and web services with built-in resilience:

import requests
from typing import Dict, Any

@tool(timeout=30.0, max_retries=3)
def fetch_weather_data(city: str, api_key: str) -> Dict[str, Any]:
    """Fetch current weather data for a city.
    
    Args:
        city: Name of the city
        api_key: Weather API key
        
    Returns:
        Weather data including temperature, humidity, conditions
    """
    url = f"https://api.weather.com/v1/current"
    params = {"city": city, "key": api_key}
    
    response = requests.get(url, params=params)
    response.raise_for_status()
    
    data = response.json()
    return {
        "city": city,
        "temperature": data["current"]["temp_f"],
        "humidity": data["current"]["humidity"],
        "conditions": data["current"]["condition"]["text"],
        "timestamp": data["current"]["last_updated"]
    }

# Automatic retry on network failures
weather = await fetch_weather_data("San Francisco", "your-api-key")

System Access

Controlled access to databases and file systems:

import sqlite3
from pathlib import Path

@tool
def query_database(query: str, database_path: str) -> list[dict]:
    """Execute SQL query and return results.
    
    Args:
        query: SQL query to execute
        database_path: Path to SQLite database
        
    Returns:
        List of rows as dictionaries
    """
    conn = sqlite3.connect(database_path)
    conn.row_factory = sqlite3.Row  # Return rows as dictionaries
    
    try:
        cursor = conn.execute(query)
        results = [dict(row) for row in cursor.fetchall()]
        return results
    finally:
        conn.close()

@tool
def read_file_content(file_path: str, encoding: str = "utf-8") -> str:
    """Read content from a file.
    
    Args:
        file_path: Path to the file to read
        encoding: File encoding (default: utf-8)
        
    Returns:
        File content as string
    """
    path = Path(file_path)
    if not path.exists():
        raise FileNotFoundError(f"File not found: {file_path}")
    
    return path.read_text(encoding=encoding)

@tool
def write_file_content(file_path: str, content: str, encoding: str = "utf-8") -> dict:
    """Write content to a file.
    
    Args:
        file_path: Path to the file to write
        content: Content to write
        encoding: File encoding (default: utf-8)
        
    Returns:
        Result with file info
    """
    path = Path(file_path)
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding=encoding)
    
    return {
        "file_path": str(path),
        "size_bytes": path.stat().st_size,
        "created": True
    }

Webhook Handling

Process inbound and send outbound webhooks:

import json
from datetime import datetime

@tool
def send_webhook(url: str, payload: dict, headers: dict = None) -> dict:
    """Send webhook to external system.
    
    Args:
        url: Webhook endpoint URL
        payload: Data to send
        headers: Optional HTTP headers
        
    Returns:
        Response status and details
    """
    import requests
    
    headers = headers or {"Content-Type": "application/json"}
    
    response = requests.post(
        url, 
        json=payload, 
        headers=headers,
        timeout=30
    )
    
    return {
        "status_code": response.status_code,
        "success": response.status_code < 400,
        "response_data": response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text,
        "timestamp": datetime.now().isoformat()
    }

@tool
def process_webhook_event(event_data: dict) -> dict:
    """Process incoming webhook event.
    
    Args:
        event_data: Webhook event payload
        
    Returns:
        Processing result
    """
    event_type = event_data.get("type")
    timestamp = event_data.get("timestamp")
    
    if event_type == "payment.completed":
        return process_payment_webhook(event_data)
    elif event_type == "user.created":
        return process_user_webhook(event_data)
    else:
        return {
            "processed": False,
            "error": f"Unknown event type: {event_type}",
            "timestamp": timestamp
        }

Custom Implementations

Build domain-specific tools for your use case:

from typing import List
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

@tool
def send_email(
    to_emails: List[str], 
    subject: str, 
    body: str, 
    from_email: str,
    smtp_server: str = "smtp.gmail.com",
    smtp_port: int = 587
) -> dict:
    """Send email notification.
    
    Args:
        to_emails: List of recipient email addresses
        subject: Email subject
        body: Email body content
        from_email: Sender email address
        smtp_server: SMTP server hostname
        smtp_port: SMTP server port
        
    Returns:
        Delivery status for each recipient
    """
    msg = MIMEMultipart()
    msg['From'] = from_email
    msg['Subject'] = subject
    msg.attach(MIMEText(body, 'plain'))
    
    results = []
    
    try:
        server = smtplib.SMTP(smtp_server, smtp_port)
        server.starttls()
        # Note: Use environment variables for credentials in production
        server.login(from_email, os.getenv("EMAIL_PASSWORD"))
        
        for email in to_emails:
            try:
                msg['To'] = email
                server.send_message(msg)
                results.append({"email": email, "sent": True, "error": None})
                del msg['To']  # Remove for next iteration
            except Exception as e:
                results.append({"email": email, "sent": False, "error": str(e)})
        
        server.quit()
        
    except Exception as e:
        # If server connection fails, mark all as failed
        for email in to_emails:
            results.append({"email": email, "sent": False, "error": str(e)})
    
    return {
        "total_emails": len(to_emails),
        "successful": sum(1 for r in results if r["sent"]),
        "failed": sum(1 for r in results if not r["sent"]),
        "results": results
    }

Creating Tools

Basic Tool

Convert any function into a tool:

from agnt5 import tool

@tool
def add_numbers(a: float, b: float) -> float:
    """Add two numbers together."""
    return a + b

# Use directly
result = await add_numbers(10, 20)  # Returns 30

# Use with agents
from agnt5 import Agent
agent = Agent(name="math_agent", tools=[add_numbers])
response = await agent.run("What's 15 plus 25?")  # Agent uses the tool

Tool with Configuration

Configure retry behavior, timeouts, and durability:

@tool(
    name="external_api_call",
    description="Call external API with retry logic",
    timeout=60.0,
    max_retries=5,
    retry_delay=2.0,
    enable_durability=True
)
def call_external_api(endpoint: str, params: dict = None) -> dict:
    """Call external API with comprehensive error handling."""
    import requests
    
    response = requests.get(endpoint, params=params or {})
    response.raise_for_status()
    return response.json()

Durable Tool

Create tools with explicit durability guarantees:

from agnt5 import durable_tool

@durable_tool(name="critical_operation", timeout=300.0, max_retries=10)
def perform_critical_operation(data: dict) -> dict:
    """Critical operation that must complete successfully."""
    # This operation will retry automatically and survive system restarts
    result = complex_external_operation(data)
    return {"success": True, "result": result}

ToolKit Organization

Organize related tools into tool kits:

from agnt5 import ToolKit, tool

class DatabaseTools(ToolKit):
    """Database operations toolkit."""
    
    def __init__(self, connection_string: str):
        super().__init__(name="database_tools")
        self.connection_string = connection_string
    
    @tool
    def create_user(self, username: str, email: str, role: str = "user") -> dict:
        """Create a new user in the database."""
        # Implementation here
        return {"user_id": "123", "created": True}
    
    @tool
    def get_user(self, user_id: str) -> dict:
        """Retrieve user by ID."""
        # Implementation here
        return {"user_id": user_id, "username": "john_doe", "email": "[email protected]"}
    
    @tool
    def update_user(self, user_id: str, updates: dict) -> dict:
        """Update user information."""
        # Implementation here
        return {"user_id": user_id, "updated": True}

class FileTools(ToolKit):
    """File operations toolkit."""
    
    @tool
    def list_files(self, directory: str) -> List[str]:
        """List files in directory."""
        from pathlib import Path
        path = Path(directory)
        return [f.name for f in path.iterdir() if f.is_file()]
    
    @tool
    def file_info(self, file_path: str) -> dict:
        """Get file information."""
        from pathlib import Path
        path = Path(file_path)
        stat = path.stat()
        return {
            "name": path.name,
            "size": stat.st_size,
            "modified": stat.st_mtime,
            "exists": path.exists()
        }

# Use toolkits
db_tools = DatabaseTools("postgresql://...")
file_tools = FileTools()

agent = Agent(
    name="admin_agent",
    tools=[db_tools, file_tools]  # Agent gets all tools from both kits
)

Integration with Agents

Agent Tool Usage

Agents automatically use tools when needed:

from agnt5 import Agent, tool

@tool
def get_stock_price(symbol: str) -> dict:
    """Get current stock price for a symbol."""
    # Mock implementation
    return {"symbol": symbol, "price": 150.25, "currency": "USD"}

@tool
def calculate_portfolio_value(holdings: List[dict]) -> float:
    """Calculate total portfolio value."""
    total = 0
    for holding in holdings:
        total += holding["shares"] * holding["price_per_share"]
    return total

agent = Agent(
    name="financial_advisor",
    model="gpt-4o",
    tools=[get_stock_price, calculate_portfolio_value],
    system_prompt="You help with financial analysis and portfolio management."
)

# Agent automatically uses tools to answer questions
response = await agent.run("What's the current price of AAPL and what would my portfolio be worth if I have 100 shares?")
# Agent calls get_stock_price("AAPL") and uses the result

Multi-Tool Workflows

Agents can chain multiple tools together:

@tool
def search_products(query: str, category: str = None) -> List[dict]:
    """Search for products in inventory."""
    # Implementation here
    return [{"id": "1", "name": "Product A", "price": 29.99}]

@tool
def check_inventory(product_id: str) -> dict:
    """Check inventory levels for a product."""
    # Implementation here
    return {"product_id": product_id, "quantity": 15, "available": True}

@tool
def create_order(product_id: str, quantity: int, customer_id: str) -> dict:
    """Create a new order."""
    # Implementation here
    return {"order_id": "order_123", "status": "created"}

sales_agent = Agent(
    name="sales_agent",
    model="gpt-4o",
    tools=[search_products, check_inventory, create_order],
    system_prompt="You help customers find and order products."
)

# Agent can use multiple tools in sequence
response = await sales_agent.run(
    "I'm looking for laptops under $1000. Can you check availability and create an order for customer ID 456?"
)
# Agent will: search_products -> check_inventory -> create_order

Tool Error Handling

Handle tool errors gracefully in agent interactions:

@tool
def unreliable_service(data: str) -> dict:
    """Service that might fail occasionally."""
    import random
    if random.random() < 0.3:  # 30% failure rate
        raise Exception("Service temporarily unavailable")
    return {"processed": data, "success": True}

@tool
def fallback_service(data: str) -> dict:
    """Reliable fallback service."""
    return {"processed": data, "success": True, "fallback": True}

smart_agent = Agent(
    name="smart_agent",
    model="gpt-4o",
    tools=[unreliable_service, fallback_service],
    system_prompt="""
    You can process data using tools. If the primary service fails, 
    use the fallback service and inform the user.
    """
)

# Agent handles tool failures and uses alternatives
response = await smart_agent.run("Process this data: 'important information'")

Integration with Workflows

Workflow Tool Orchestration

Use tools within workflows for complex operations:

from agnt5 import workflow, tool

@tool
def validate_data(data: dict) -> dict:
    """Validate input data."""
    # Validation logic here
    return {"valid": True, "cleaned_data": data}

@tool
def process_payment(payment_info: dict) -> dict:
    """Process payment through external service."""
    # Payment processing logic
    return {"transaction_id": "txn_123", "status": "success"}

@tool
def send_confirmation(order_details: dict, customer_email: str) -> dict:
    """Send order confirmation email."""
    # Email sending logic
    return {"sent": True, "email": customer_email}

@workflow
async def order_processing_workflow(order_data: dict) -> dict:
    """Complete order processing with tool integration."""
    # Step 1: Validate order data
    validation_result = await validate_data(order_data)
    
    if not validation_result["valid"]:
        return {"success": False, "error": "Invalid order data"}
    
    # Step 2: Process payment
    payment_result = await process_payment(order_data["payment"])
    
    if payment_result["status"] != "success":
        return {"success": False, "error": "Payment failed"}
    
    # Step 3: Send confirmation
    confirmation_result = await send_confirmation(
        order_data, 
        order_data["customer_email"]
    )
    
    return {
        "success": True,
        "order_id": order_data["id"],
        "transaction_id": payment_result["transaction_id"],
        "confirmation_sent": confirmation_result["sent"]
    }

Parallel Tool Execution

Execute multiple tools in parallel within workflows:

@tool
def fetch_user_profile(user_id: str) -> dict:
    """Fetch user profile data."""
    return {"user_id": user_id, "name": "John Doe"}

@tool
def fetch_user_preferences(user_id: str) -> dict:
    """Fetch user preferences."""
    return {"user_id": user_id, "theme": "dark", "notifications": True}

@tool
def fetch_user_activity(user_id: str) -> dict:
    """Fetch recent user activity."""
    return {"user_id": user_id, "last_login": "2024-01-15", "actions": 42}

@workflow
async def user_dashboard_workflow(user_id: str) -> dict:
    """Build user dashboard with parallel data fetching."""
    import asyncio
    
    # Fetch all user data in parallel
    profile_task = fetch_user_profile(user_id)
    preferences_task = fetch_user_preferences(user_id)
    activity_task = fetch_user_activity(user_id)
    
    # Wait for all to complete
    profile, preferences, activity = await asyncio.gather(
        profile_task, preferences_task, activity_task
    )
    
    return {
        "user_id": user_id,
        "profile": profile,
        "preferences": preferences,
        "activity": activity,
        "dashboard_ready": True
    }

Common Tool Patterns

API Integration Tools

Connect to external services with proper error handling:

@tool
def slack_notification(channel: str, message: str, webhook_url: str) -> dict:
    """Send notification to Slack channel."""
    import requests
    
    payload = {
        "channel": channel,
        "text": message,
        "username": "AGNT5-Bot"
    }
    
    response = requests.post(webhook_url, json=payload)
    
    return {
        "sent": response.status_code == 200,
        "status_code": response.status_code,
        "channel": channel
    }

@tool 
def github_create_issue(repo: str, title: str, body: str, token: str) -> dict:
    """Create GitHub issue."""
    import requests
    
    url = f"https://api.github.com/repos/{repo}/issues"
    headers = {
        "Authorization": f"token {token}",
        "Accept": "application/vnd.github.v3+json"
    }
    
    data = {"title": title, "body": body}
    response = requests.post(url, json=data, headers=headers)
    
    if response.status_code == 201:
        issue = response.json()
        return {
            "created": True,
            "issue_number": issue["number"],
            "url": issue["html_url"]
        }
    else:
        return {"created": False, "error": response.text}

@tool
def stripe_create_payment(amount: int, currency: str, customer_id: str) -> dict:
    """Create Stripe payment intent."""
    import stripe
    
    try:
        intent = stripe.PaymentIntent.create(
            amount=amount,
            currency=currency,
            customer=customer_id
        )
        
        return {
            "success": True,
            "payment_intent_id": intent.id,
            "client_secret": intent.client_secret
        }
    except stripe.error.StripeError as e:
        return {
            "success": False,
            "error": str(e)
        }

Data Processing Tools

Handle data transformation and analysis:

import pandas as pd
from typing import Any

@tool
def analyze_csv_data(file_path: str, analysis_type: str = "summary") -> dict:
    """Analyze CSV data with pandas."""
    df = pd.read_csv(file_path)
    
    if analysis_type == "summary":
        return {
            "rows": len(df),
            "columns": len(df.columns),
            "column_names": df.columns.tolist(),
            "summary_stats": df.describe().to_dict()
        }
    elif analysis_type == "missing":
        return {
            "missing_values": df.isnull().sum().to_dict(),
            "missing_percentage": (df.isnull().sum() / len(df) * 100).to_dict()
        }
    else:
        return {"error": f"Unknown analysis type: {analysis_type}"}

@tool
def transform_json_data(data: List[dict], transformations: List[str]) -> List[dict]:
    """Transform JSON data with specified operations."""
    import copy
    result = copy.deepcopy(data)
    
    for transform in transformations:
        if transform == "lowercase_keys":
            result = [{k.lower(): v for k, v in item.items()} for item in result]
        elif transform == "remove_nulls":
            result = [{k: v for k, v in item.items() if v is not None} for item in result]
        elif transform.startswith("filter_"):
            # filter_field_value format
            _, field, value = transform.split("_", 2)
            result = [item for item in result if item.get(field) == value]
    
    return result

@tool
def generate_report(data: Any, template: str = "html") -> str:
    """Generate formatted report from data."""
    if template == "html":
        html = "<html><body><h1>Data Report</h1>"
        if isinstance(data, dict):
            html += "<ul>"
            for key, value in data.items():
                html += f"<li><strong>{key}:</strong> {value}</li>"
            html += "</ul>"
        html += "</body></html>"
        return html
    elif template == "markdown":
        if isinstance(data, dict):
            md = "# Data Report\n\n"
            for key, value in data.items():
                md += f"- **{key}:** {value}\n"
            return md
        else:
            return f"# Data Report\n\n{str(data)}"
    else:
        return str(data)

Monitoring and Alerting Tools

Implement monitoring and alerting capabilities:

from datetime import datetime, timedelta

@tool
def log_metric(metric_name: str, value: float, tags: dict = None) -> dict:
    """Log a metric for monitoring."""
    # In production, send to monitoring service like DataDog, CloudWatch, etc.
    timestamp = datetime.now().isoformat()
    
    metric_data = {
        "name": metric_name,
        "value": value,
        "timestamp": timestamp,
        "tags": tags or {}
    }
    
    # Log to monitoring service
    print(f"METRIC: {metric_data}")  # Replace with actual monitoring integration
    
    return {"logged": True, "metric": metric_data}

@tool
def check_system_health(service_name: str, endpoint: str) -> dict:
    """Check health of a system service."""
    import requests
    
    try:
        start_time = datetime.now()
        response = requests.get(endpoint, timeout=10)
        end_time = datetime.now()
        
        response_time = (end_time - start_time).total_seconds()
        
        health_status = {
            "service": service_name,
            "healthy": response.status_code == 200,
            "status_code": response.status_code,
            "response_time_seconds": response_time,
            "timestamp": start_time.isoformat()
        }
        
        # Log metric
        await log_metric(f"{service_name}.response_time", response_time, {"service": service_name})
        await log_metric(f"{service_name}.status", 1 if health_status["healthy"] else 0, {"service": service_name})
        
        return health_status
        
    except Exception as e:
        return {
            "service": service_name,
            "healthy": False,
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        }

@tool
def send_alert(severity: str, message: str, channels: List[str] = None) -> dict:
    """Send alert to configured channels."""
    channels = channels or ["email", "slack"]
    alert_data = {
        "severity": severity,
        "message": message,
        "timestamp": datetime.now().isoformat(),
        "alert_id": f"alert_{int(datetime.now().timestamp())}"
    }
    
    results = []
    for channel in channels:
        if channel == "email":
            # Send email alert
            result = {"channel": "email", "sent": True}
        elif channel == "slack":
            # Send Slack alert  
            result = {"channel": "slack", "sent": True}
        else:
            result = {"channel": channel, "sent": False, "error": "Unknown channel"}
        
        results.append(result)
    
    return {
        "alert_id": alert_data["alert_id"],
        "severity": severity,
        "channels_notified": [r["channel"] for r in results if r["sent"]],
        "results": results
    }

Advanced Tool Features

Tool Composition

Combine multiple tools for complex operations:

@tool
def backup_and_analyze_data(source_db: str, backup_location: str) -> dict:
    """Composite tool that backs up data and provides analysis."""
    # Step 1: Backup data
    backup_result = backup_database(source_db, backup_location)
    
    if not backup_result["success"]:
        return {"success": False, "error": "Backup failed"}
    
    # Step 2: Analyze backed up data
    analysis_result = analyze_database(backup_location)
    
    # Step 3: Generate report
    report = generate_backup_report(backup_result, analysis_result)
    
    return {
        "success": True,
        "backup_path": backup_result["path"],
        "analysis": analysis_result,
        "report": report
    }

Dynamic Tool Creation

Create tools dynamically based on configuration:

def create_api_tool(api_name: str, base_url: str, endpoints: dict) -> tool:
    """Dynamically create API integration tool."""
    
    @tool(name=f"{api_name}_api")
    def api_tool(endpoint: str, method: str = "GET", data: dict = None) -> dict:
        f"""Call {api_name} API endpoint.
        
        Available endpoints: {list(endpoints.keys())}
        """
        import requests
        
        if endpoint not in endpoints:
            return {"error": f"Unknown endpoint: {endpoint}"}
        
        url = f"{base_url}{endpoints[endpoint]}"
        
        if method == "GET":
            response = requests.get(url, params=data)
        elif method == "POST":
            response = requests.post(url, json=data)
        else:
            return {"error": f"Unsupported method: {method}"}
        
        return {
            "status_code": response.status_code,
            "data": response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text
        }
    
    return api_tool

# Create specific API tools
github_tool = create_api_tool("github", "https://api.github.com", {
    "user": "/user",
    "repos": "/user/repos",
    "issues": "/repos/{owner}/{repo}/issues"
})

stripe_tool = create_api_tool("stripe", "https://api.stripe.com/v1", {
    "customers": "/customers",
    "payments": "/payment_intents",
    "subscriptions": "/subscriptions"
})

Tool Middleware

Add middleware for logging, authentication, and monitoring:

from functools import wraps

def with_auth(auth_required: bool = True):
    """Middleware to add authentication to tools."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            if auth_required:
                # Check authentication
                auth_token = kwargs.pop("auth_token", None)
                if not auth_token or not validate_token(auth_token):
                    return {"error": "Authentication required"}
            
            return await func(*args, **kwargs)
        return wrapper
    return decorator

def with_logging(log_inputs: bool = True, log_outputs: bool = True):
    """Middleware to add logging to tools."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            if log_inputs:
                print(f"Tool {func.__name__} called with args: {args}, kwargs: {kwargs}")
            
            start_time = datetime.now()
            result = await func(*args, **kwargs)
            end_time = datetime.now()
            
            if log_outputs:
                print(f"Tool {func.__name__} completed in {(end_time - start_time).total_seconds()}s")
                print(f"Result: {result}")
            
            return result
        return wrapper
    return decorator

# Use middleware with tools
@tool
@with_auth(auth_required=True)
@with_logging(log_inputs=True, log_outputs=True)
def secure_operation(data: str, auth_token: str) -> dict:
    """Secure operation with auth and logging."""
    return {"processed": data, "timestamp": datetime.now().isoformat()}

Performance Optimization

Connection Pooling

Reuse connections for better performance:

import threading
from typing import Optional
import psycopg2
from psycopg2 import pool

class DatabaseConnectionPool:
    _instance: Optional['DatabaseConnectionPool'] = None
    _lock = threading.Lock()
    
    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance._initialize_pool()
        return cls._instance
    
    def _initialize_pool(self):
        self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
            minconn=1,
            maxconn=20,
            host="localhost",
            database="mydb",
            user="user",
            password="password"
        )
    
    def get_connection(self):
        return self.connection_pool.getconn()
    
    def put_connection(self, conn):
        self.connection_pool.putconn(conn)

@tool
def efficient_db_query(query: str) -> List[dict]:
    """Database query using connection pool."""
    pool = DatabaseConnectionPool()
    conn = pool.get_connection()
    
    try:
        cursor = conn.cursor()
        cursor.execute(query)
        columns = [desc[0] for desc in cursor.description]
        results = [dict(zip(columns, row)) for row in cursor.fetchall()]
        return results
    finally:
        pool.put_connection(conn)

Caching

Cache expensive operations:

from functools import lru_cache
import time

@tool
def expensive_computation(input_data: str) -> dict:
    """Expensive computation with caching."""
    
    @lru_cache(maxsize=100)
    def _cached_computation(data: str) -> str:
        # Simulate expensive operation
        time.sleep(2)
        return f"processed_{data}"
    
    result = _cached_computation(input_data)
    return {"input": input_data, "result": result}

# Redis-based caching for distributed systems
import redis
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)

@tool
def cached_api_call(endpoint: str, cache_ttl: int = 300) -> dict:
    """API call with Redis caching."""
    cache_key = f"api_call:{endpoint}"
    
    # Check cache first
    cached_result = redis_client.get(cache_key)
    if cached_result:
        return json.loads(cached_result)
    
    # Make API call
    import requests
    response = requests.get(endpoint)
    result = response.json()
    
    # Cache result
    redis_client.setex(cache_key, cache_ttl, json.dumps(result))
    
    return result

Batch Operations

Process multiple items efficiently:

@tool
def batch_email_send(emails: List[dict], batch_size: int = 10) -> dict:
    """Send emails in batches for better performance."""
    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    
    def send_single_email(email_data):
        # Single email sending logic
        return {"email": email_data["to"], "sent": True}
    
    def send_batch(batch):
        with ThreadPoolExecutor(max_workers=5) as executor:
            results = list(executor.map(send_single_email, batch))
        return results
    
    # Process in batches
    all_results = []
    for i in range(0, len(emails), batch_size):
        batch = emails[i:i + batch_size]
        batch_results = send_batch(batch)
        all_results.extend(batch_results)
    
    successful = sum(1 for r in all_results if r["sent"])
    
    return {
        "total_emails": len(emails),
        "successful": successful,
        "failed": len(emails) - successful,
        "results": all_results
    }

Best Practices

Tool Design:

  • Each tool should have one clear purpose
  • Use type hints for all parameters and return values
  • Provide clear docstrings with parameter descriptions
  • Handle expected errors gracefully

Performance:

  • Pool database connections and HTTP clients
  • Cache expensive operations when appropriate
  • Group similar operations for efficiency
  • Use async/await for I/O-bound operations

Security:

  • Validate all inputs before processing
  • Implement proper authentication for sensitive operations
  • Prevent abuse with rate limiting
  • Use environment variables for sensitive data

Monitoring:

  • Log tool usage and performance metrics
  • Monitor and alert on tool failures
  • Track response times and success rates
  • Monitor which tools are used most frequently

Next Steps

Tools provide the bridge between AGNT5 and external systems. Learn how they integrate with other components:

  • Tasks - Create durable functions that use tools
  • Workflows - Orchestrate multiple tools in complex operations
  • Agents - Give AI agents access to your tools
  • Entities - Build stateful components that use tools

Ready to connect your first external system? Check out the Quick Start Guide to start building with tools.