Tools
Extend functionality by providing access to external systems, APIs, and custom functions that bridge workflows with the real world
Tools extend functionality by providing access to external systems, APIs, and custom functions.
They bridge your workflows with the real world, enabling agents and tasks to interact with databases, APIs, file systems, and any external service.
Every tool provides automatic schema generation, validation, and error handling with built-in retry mechanisms.
What Tools Enable
Tools connect your AGNT5 applications to external systems and capabilities:
- Connect to databases and external APIs with automatic retry logic
- Execute custom functions with parameter validation and error handling
- Process files, images, and media with type-safe interfaces
- Send notifications and webhooks with delivery guarantees
- Integrate with existing systems and infrastructure seamlessly
Key Features
Function Integration
Convert any Python function into a tool with automatic schema generation:
from agnt5 import tool
@tool
def calculate_compound_interest(principal: float, rate: float, time: int, compounds_per_year: int = 1) -> float:
"""Calculate compound interest over time.
Args:
principal: Initial investment amount
rate: Annual interest rate (as decimal, e.g., 0.05 for 5%)
time: Time period in years
compounds_per_year: Number of times interest compounds per year
Returns:
Final amount after compound interest
"""
return principal * (1 + rate / compounds_per_year) ** (compounds_per_year * time)
# Tool automatically generates schema for agents
schema = calculate_compound_interest.get_schema()
# Returns OpenAI function schema for agent integration
API Connectivity
Connect to REST APIs and web services with built-in resilience:
import requests
from typing import Dict, Any
@tool(timeout=30.0, max_retries=3)
def fetch_weather_data(city: str, api_key: str) -> Dict[str, Any]:
"""Fetch current weather data for a city.
Args:
city: Name of the city
api_key: Weather API key
Returns:
Weather data including temperature, humidity, conditions
"""
url = f"https://api.weather.com/v1/current"
params = {"city": city, "key": api_key}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
return {
"city": city,
"temperature": data["current"]["temp_f"],
"humidity": data["current"]["humidity"],
"conditions": data["current"]["condition"]["text"],
"timestamp": data["current"]["last_updated"]
}
# Automatic retry on network failures
weather = await fetch_weather_data("San Francisco", "your-api-key")
System Access
Controlled access to databases and file systems:
import sqlite3
from pathlib import Path
@tool
def query_database(query: str, database_path: str) -> list[dict]:
"""Execute SQL query and return results.
Args:
query: SQL query to execute
database_path: Path to SQLite database
Returns:
List of rows as dictionaries
"""
conn = sqlite3.connect(database_path)
conn.row_factory = sqlite3.Row # Return rows as dictionaries
try:
cursor = conn.execute(query)
results = [dict(row) for row in cursor.fetchall()]
return results
finally:
conn.close()
@tool
def read_file_content(file_path: str, encoding: str = "utf-8") -> str:
"""Read content from a file.
Args:
file_path: Path to the file to read
encoding: File encoding (default: utf-8)
Returns:
File content as string
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
return path.read_text(encoding=encoding)
@tool
def write_file_content(file_path: str, content: str, encoding: str = "utf-8") -> dict:
"""Write content to a file.
Args:
file_path: Path to the file to write
content: Content to write
encoding: File encoding (default: utf-8)
Returns:
Result with file info
"""
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding=encoding)
return {
"file_path": str(path),
"size_bytes": path.stat().st_size,
"created": True
}
Webhook Handling
Process inbound and send outbound webhooks:
import json
from datetime import datetime
@tool
def send_webhook(url: str, payload: dict, headers: dict = None) -> dict:
"""Send webhook to external system.
Args:
url: Webhook endpoint URL
payload: Data to send
headers: Optional HTTP headers
Returns:
Response status and details
"""
import requests
headers = headers or {"Content-Type": "application/json"}
response = requests.post(
url,
json=payload,
headers=headers,
timeout=30
)
return {
"status_code": response.status_code,
"success": response.status_code < 400,
"response_data": response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text,
"timestamp": datetime.now().isoformat()
}
@tool
def process_webhook_event(event_data: dict) -> dict:
"""Process incoming webhook event.
Args:
event_data: Webhook event payload
Returns:
Processing result
"""
event_type = event_data.get("type")
timestamp = event_data.get("timestamp")
if event_type == "payment.completed":
return process_payment_webhook(event_data)
elif event_type == "user.created":
return process_user_webhook(event_data)
else:
return {
"processed": False,
"error": f"Unknown event type: {event_type}",
"timestamp": timestamp
}
Custom Implementations
Build domain-specific tools for your use case:
from typing import List
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
@tool
def send_email(
to_emails: List[str],
subject: str,
body: str,
from_email: str,
smtp_server: str = "smtp.gmail.com",
smtp_port: int = 587
) -> dict:
"""Send email notification.
Args:
to_emails: List of recipient email addresses
subject: Email subject
body: Email body content
from_email: Sender email address
smtp_server: SMTP server hostname
smtp_port: SMTP server port
Returns:
Delivery status for each recipient
"""
msg = MIMEMultipart()
msg['From'] = from_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
results = []
try:
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
# Note: Use environment variables for credentials in production
server.login(from_email, os.getenv("EMAIL_PASSWORD"))
for email in to_emails:
try:
msg['To'] = email
server.send_message(msg)
results.append({"email": email, "sent": True, "error": None})
del msg['To'] # Remove for next iteration
except Exception as e:
results.append({"email": email, "sent": False, "error": str(e)})
server.quit()
except Exception as e:
# If server connection fails, mark all as failed
for email in to_emails:
results.append({"email": email, "sent": False, "error": str(e)})
return {
"total_emails": len(to_emails),
"successful": sum(1 for r in results if r["sent"]),
"failed": sum(1 for r in results if not r["sent"]),
"results": results
}
Creating Tools
Basic Tool
Convert any function into a tool:
from agnt5 import tool
@tool
def add_numbers(a: float, b: float) -> float:
"""Add two numbers together."""
return a + b
# Use directly
result = await add_numbers(10, 20) # Returns 30
# Use with agents
from agnt5 import Agent
agent = Agent(name="math_agent", tools=[add_numbers])
response = await agent.run("What's 15 plus 25?") # Agent uses the tool
Tool with Configuration
Configure retry behavior, timeouts, and durability:
@tool(
name="external_api_call",
description="Call external API with retry logic",
timeout=60.0,
max_retries=5,
retry_delay=2.0,
enable_durability=True
)
def call_external_api(endpoint: str, params: dict = None) -> dict:
"""Call external API with comprehensive error handling."""
import requests
response = requests.get(endpoint, params=params or {})
response.raise_for_status()
return response.json()
Durable Tool
Create tools with explicit durability guarantees:
from agnt5 import durable_tool
@durable_tool(name="critical_operation", timeout=300.0, max_retries=10)
def perform_critical_operation(data: dict) -> dict:
"""Critical operation that must complete successfully."""
# This operation will retry automatically and survive system restarts
result = complex_external_operation(data)
return {"success": True, "result": result}
ToolKit Organization
Organize related tools into tool kits:
from agnt5 import ToolKit, tool
class DatabaseTools(ToolKit):
"""Database operations toolkit."""
def __init__(self, connection_string: str):
super().__init__(name="database_tools")
self.connection_string = connection_string
@tool
def create_user(self, username: str, email: str, role: str = "user") -> dict:
"""Create a new user in the database."""
# Implementation here
return {"user_id": "123", "created": True}
@tool
def get_user(self, user_id: str) -> dict:
"""Retrieve user by ID."""
# Implementation here
return {"user_id": user_id, "username": "john_doe", "email": "[email protected]"}
@tool
def update_user(self, user_id: str, updates: dict) -> dict:
"""Update user information."""
# Implementation here
return {"user_id": user_id, "updated": True}
class FileTools(ToolKit):
"""File operations toolkit."""
@tool
def list_files(self, directory: str) -> List[str]:
"""List files in directory."""
from pathlib import Path
path = Path(directory)
return [f.name for f in path.iterdir() if f.is_file()]
@tool
def file_info(self, file_path: str) -> dict:
"""Get file information."""
from pathlib import Path
path = Path(file_path)
stat = path.stat()
return {
"name": path.name,
"size": stat.st_size,
"modified": stat.st_mtime,
"exists": path.exists()
}
# Use toolkits
db_tools = DatabaseTools("postgresql://...")
file_tools = FileTools()
agent = Agent(
name="admin_agent",
tools=[db_tools, file_tools] # Agent gets all tools from both kits
)
Integration with Agents
Agent Tool Usage
Agents automatically use tools when needed:
from agnt5 import Agent, tool
@tool
def get_stock_price(symbol: str) -> dict:
"""Get current stock price for a symbol."""
# Mock implementation
return {"symbol": symbol, "price": 150.25, "currency": "USD"}
@tool
def calculate_portfolio_value(holdings: List[dict]) -> float:
"""Calculate total portfolio value."""
total = 0
for holding in holdings:
total += holding["shares"] * holding["price_per_share"]
return total
agent = Agent(
name="financial_advisor",
model="gpt-4o",
tools=[get_stock_price, calculate_portfolio_value],
system_prompt="You help with financial analysis and portfolio management."
)
# Agent automatically uses tools to answer questions
response = await agent.run("What's the current price of AAPL and what would my portfolio be worth if I have 100 shares?")
# Agent calls get_stock_price("AAPL") and uses the result
Multi-Tool Workflows
Agents can chain multiple tools together:
@tool
def search_products(query: str, category: str = None) -> List[dict]:
"""Search for products in inventory."""
# Implementation here
return [{"id": "1", "name": "Product A", "price": 29.99}]
@tool
def check_inventory(product_id: str) -> dict:
"""Check inventory levels for a product."""
# Implementation here
return {"product_id": product_id, "quantity": 15, "available": True}
@tool
def create_order(product_id: str, quantity: int, customer_id: str) -> dict:
"""Create a new order."""
# Implementation here
return {"order_id": "order_123", "status": "created"}
sales_agent = Agent(
name="sales_agent",
model="gpt-4o",
tools=[search_products, check_inventory, create_order],
system_prompt="You help customers find and order products."
)
# Agent can use multiple tools in sequence
response = await sales_agent.run(
"I'm looking for laptops under $1000. Can you check availability and create an order for customer ID 456?"
)
# Agent will: search_products -> check_inventory -> create_order
Tool Error Handling
Handle tool errors gracefully in agent interactions:
@tool
def unreliable_service(data: str) -> dict:
"""Service that might fail occasionally."""
import random
if random.random() < 0.3: # 30% failure rate
raise Exception("Service temporarily unavailable")
return {"processed": data, "success": True}
@tool
def fallback_service(data: str) -> dict:
"""Reliable fallback service."""
return {"processed": data, "success": True, "fallback": True}
smart_agent = Agent(
name="smart_agent",
model="gpt-4o",
tools=[unreliable_service, fallback_service],
system_prompt="""
You can process data using tools. If the primary service fails,
use the fallback service and inform the user.
"""
)
# Agent handles tool failures and uses alternatives
response = await smart_agent.run("Process this data: 'important information'")
Integration with Workflows
Workflow Tool Orchestration
Use tools within workflows for complex operations:
from agnt5 import workflow, tool
@tool
def validate_data(data: dict) -> dict:
"""Validate input data."""
# Validation logic here
return {"valid": True, "cleaned_data": data}
@tool
def process_payment(payment_info: dict) -> dict:
"""Process payment through external service."""
# Payment processing logic
return {"transaction_id": "txn_123", "status": "success"}
@tool
def send_confirmation(order_details: dict, customer_email: str) -> dict:
"""Send order confirmation email."""
# Email sending logic
return {"sent": True, "email": customer_email}
@workflow
async def order_processing_workflow(order_data: dict) -> dict:
"""Complete order processing with tool integration."""
# Step 1: Validate order data
validation_result = await validate_data(order_data)
if not validation_result["valid"]:
return {"success": False, "error": "Invalid order data"}
# Step 2: Process payment
payment_result = await process_payment(order_data["payment"])
if payment_result["status"] != "success":
return {"success": False, "error": "Payment failed"}
# Step 3: Send confirmation
confirmation_result = await send_confirmation(
order_data,
order_data["customer_email"]
)
return {
"success": True,
"order_id": order_data["id"],
"transaction_id": payment_result["transaction_id"],
"confirmation_sent": confirmation_result["sent"]
}
Parallel Tool Execution
Execute multiple tools in parallel within workflows:
@tool
def fetch_user_profile(user_id: str) -> dict:
"""Fetch user profile data."""
return {"user_id": user_id, "name": "John Doe"}
@tool
def fetch_user_preferences(user_id: str) -> dict:
"""Fetch user preferences."""
return {"user_id": user_id, "theme": "dark", "notifications": True}
@tool
def fetch_user_activity(user_id: str) -> dict:
"""Fetch recent user activity."""
return {"user_id": user_id, "last_login": "2024-01-15", "actions": 42}
@workflow
async def user_dashboard_workflow(user_id: str) -> dict:
"""Build user dashboard with parallel data fetching."""
import asyncio
# Fetch all user data in parallel
profile_task = fetch_user_profile(user_id)
preferences_task = fetch_user_preferences(user_id)
activity_task = fetch_user_activity(user_id)
# Wait for all to complete
profile, preferences, activity = await asyncio.gather(
profile_task, preferences_task, activity_task
)
return {
"user_id": user_id,
"profile": profile,
"preferences": preferences,
"activity": activity,
"dashboard_ready": True
}
Common Tool Patterns
API Integration Tools
Connect to external services with proper error handling:
@tool
def slack_notification(channel: str, message: str, webhook_url: str) -> dict:
"""Send notification to Slack channel."""
import requests
payload = {
"channel": channel,
"text": message,
"username": "AGNT5-Bot"
}
response = requests.post(webhook_url, json=payload)
return {
"sent": response.status_code == 200,
"status_code": response.status_code,
"channel": channel
}
@tool
def github_create_issue(repo: str, title: str, body: str, token: str) -> dict:
"""Create GitHub issue."""
import requests
url = f"https://api.github.com/repos/{repo}/issues"
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
data = {"title": title, "body": body}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 201:
issue = response.json()
return {
"created": True,
"issue_number": issue["number"],
"url": issue["html_url"]
}
else:
return {"created": False, "error": response.text}
@tool
def stripe_create_payment(amount: int, currency: str, customer_id: str) -> dict:
"""Create Stripe payment intent."""
import stripe
try:
intent = stripe.PaymentIntent.create(
amount=amount,
currency=currency,
customer=customer_id
)
return {
"success": True,
"payment_intent_id": intent.id,
"client_secret": intent.client_secret
}
except stripe.error.StripeError as e:
return {
"success": False,
"error": str(e)
}
Data Processing Tools
Handle data transformation and analysis:
import pandas as pd
from typing import Any
@tool
def analyze_csv_data(file_path: str, analysis_type: str = "summary") -> dict:
"""Analyze CSV data with pandas."""
df = pd.read_csv(file_path)
if analysis_type == "summary":
return {
"rows": len(df),
"columns": len(df.columns),
"column_names": df.columns.tolist(),
"summary_stats": df.describe().to_dict()
}
elif analysis_type == "missing":
return {
"missing_values": df.isnull().sum().to_dict(),
"missing_percentage": (df.isnull().sum() / len(df) * 100).to_dict()
}
else:
return {"error": f"Unknown analysis type: {analysis_type}"}
@tool
def transform_json_data(data: List[dict], transformations: List[str]) -> List[dict]:
"""Transform JSON data with specified operations."""
import copy
result = copy.deepcopy(data)
for transform in transformations:
if transform == "lowercase_keys":
result = [{k.lower(): v for k, v in item.items()} for item in result]
elif transform == "remove_nulls":
result = [{k: v for k, v in item.items() if v is not None} for item in result]
elif transform.startswith("filter_"):
# filter_field_value format
_, field, value = transform.split("_", 2)
result = [item for item in result if item.get(field) == value]
return result
@tool
def generate_report(data: Any, template: str = "html") -> str:
"""Generate formatted report from data."""
if template == "html":
html = "<html><body><h1>Data Report</h1>"
if isinstance(data, dict):
html += "<ul>"
for key, value in data.items():
html += f"<li><strong>{key}:</strong> {value}</li>"
html += "</ul>"
html += "</body></html>"
return html
elif template == "markdown":
if isinstance(data, dict):
md = "# Data Report\n\n"
for key, value in data.items():
md += f"- **{key}:** {value}\n"
return md
else:
return f"# Data Report\n\n{str(data)}"
else:
return str(data)
Monitoring and Alerting Tools
Implement monitoring and alerting capabilities:
from datetime import datetime, timedelta
@tool
def log_metric(metric_name: str, value: float, tags: dict = None) -> dict:
"""Log a metric for monitoring."""
# In production, send to monitoring service like DataDog, CloudWatch, etc.
timestamp = datetime.now().isoformat()
metric_data = {
"name": metric_name,
"value": value,
"timestamp": timestamp,
"tags": tags or {}
}
# Log to monitoring service
print(f"METRIC: {metric_data}") # Replace with actual monitoring integration
return {"logged": True, "metric": metric_data}
@tool
def check_system_health(service_name: str, endpoint: str) -> dict:
"""Check health of a system service."""
import requests
try:
start_time = datetime.now()
response = requests.get(endpoint, timeout=10)
end_time = datetime.now()
response_time = (end_time - start_time).total_seconds()
health_status = {
"service": service_name,
"healthy": response.status_code == 200,
"status_code": response.status_code,
"response_time_seconds": response_time,
"timestamp": start_time.isoformat()
}
# Log metric
await log_metric(f"{service_name}.response_time", response_time, {"service": service_name})
await log_metric(f"{service_name}.status", 1 if health_status["healthy"] else 0, {"service": service_name})
return health_status
except Exception as e:
return {
"service": service_name,
"healthy": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
@tool
def send_alert(severity: str, message: str, channels: List[str] = None) -> dict:
"""Send alert to configured channels."""
channels = channels or ["email", "slack"]
alert_data = {
"severity": severity,
"message": message,
"timestamp": datetime.now().isoformat(),
"alert_id": f"alert_{int(datetime.now().timestamp())}"
}
results = []
for channel in channels:
if channel == "email":
# Send email alert
result = {"channel": "email", "sent": True}
elif channel == "slack":
# Send Slack alert
result = {"channel": "slack", "sent": True}
else:
result = {"channel": channel, "sent": False, "error": "Unknown channel"}
results.append(result)
return {
"alert_id": alert_data["alert_id"],
"severity": severity,
"channels_notified": [r["channel"] for r in results if r["sent"]],
"results": results
}
Advanced Tool Features
Tool Composition
Combine multiple tools for complex operations:
@tool
def backup_and_analyze_data(source_db: str, backup_location: str) -> dict:
"""Composite tool that backs up data and provides analysis."""
# Step 1: Backup data
backup_result = backup_database(source_db, backup_location)
if not backup_result["success"]:
return {"success": False, "error": "Backup failed"}
# Step 2: Analyze backed up data
analysis_result = analyze_database(backup_location)
# Step 3: Generate report
report = generate_backup_report(backup_result, analysis_result)
return {
"success": True,
"backup_path": backup_result["path"],
"analysis": analysis_result,
"report": report
}
Dynamic Tool Creation
Create tools dynamically based on configuration:
def create_api_tool(api_name: str, base_url: str, endpoints: dict) -> tool:
"""Dynamically create API integration tool."""
@tool(name=f"{api_name}_api")
def api_tool(endpoint: str, method: str = "GET", data: dict = None) -> dict:
f"""Call {api_name} API endpoint.
Available endpoints: {list(endpoints.keys())}
"""
import requests
if endpoint not in endpoints:
return {"error": f"Unknown endpoint: {endpoint}"}
url = f"{base_url}{endpoints[endpoint]}"
if method == "GET":
response = requests.get(url, params=data)
elif method == "POST":
response = requests.post(url, json=data)
else:
return {"error": f"Unsupported method: {method}"}
return {
"status_code": response.status_code,
"data": response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text
}
return api_tool
# Create specific API tools
github_tool = create_api_tool("github", "https://api.github.com", {
"user": "/user",
"repos": "/user/repos",
"issues": "/repos/{owner}/{repo}/issues"
})
stripe_tool = create_api_tool("stripe", "https://api.stripe.com/v1", {
"customers": "/customers",
"payments": "/payment_intents",
"subscriptions": "/subscriptions"
})
Tool Middleware
Add middleware for logging, authentication, and monitoring:
from functools import wraps
def with_auth(auth_required: bool = True):
"""Middleware to add authentication to tools."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if auth_required:
# Check authentication
auth_token = kwargs.pop("auth_token", None)
if not auth_token or not validate_token(auth_token):
return {"error": "Authentication required"}
return await func(*args, **kwargs)
return wrapper
return decorator
def with_logging(log_inputs: bool = True, log_outputs: bool = True):
"""Middleware to add logging to tools."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if log_inputs:
print(f"Tool {func.__name__} called with args: {args}, kwargs: {kwargs}")
start_time = datetime.now()
result = await func(*args, **kwargs)
end_time = datetime.now()
if log_outputs:
print(f"Tool {func.__name__} completed in {(end_time - start_time).total_seconds()}s")
print(f"Result: {result}")
return result
return wrapper
return decorator
# Use middleware with tools
@tool
@with_auth(auth_required=True)
@with_logging(log_inputs=True, log_outputs=True)
def secure_operation(data: str, auth_token: str) -> dict:
"""Secure operation with auth and logging."""
return {"processed": data, "timestamp": datetime.now().isoformat()}
Performance Optimization
Connection Pooling
Reuse connections for better performance:
import threading
from typing import Optional
import psycopg2
from psycopg2 import pool
class DatabaseConnectionPool:
_instance: Optional['DatabaseConnectionPool'] = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialize_pool()
return cls._instance
def _initialize_pool(self):
self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=1,
maxconn=20,
host="localhost",
database="mydb",
user="user",
password="password"
)
def get_connection(self):
return self.connection_pool.getconn()
def put_connection(self, conn):
self.connection_pool.putconn(conn)
@tool
def efficient_db_query(query: str) -> List[dict]:
"""Database query using connection pool."""
pool = DatabaseConnectionPool()
conn = pool.get_connection()
try:
cursor = conn.cursor()
cursor.execute(query)
columns = [desc[0] for desc in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results
finally:
pool.put_connection(conn)
Caching
Cache expensive operations:
from functools import lru_cache
import time
@tool
def expensive_computation(input_data: str) -> dict:
"""Expensive computation with caching."""
@lru_cache(maxsize=100)
def _cached_computation(data: str) -> str:
# Simulate expensive operation
time.sleep(2)
return f"processed_{data}"
result = _cached_computation(input_data)
return {"input": input_data, "result": result}
# Redis-based caching for distributed systems
import redis
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
@tool
def cached_api_call(endpoint: str, cache_ttl: int = 300) -> dict:
"""API call with Redis caching."""
cache_key = f"api_call:{endpoint}"
# Check cache first
cached_result = redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Make API call
import requests
response = requests.get(endpoint)
result = response.json()
# Cache result
redis_client.setex(cache_key, cache_ttl, json.dumps(result))
return result
Batch Operations
Process multiple items efficiently:
@tool
def batch_email_send(emails: List[dict], batch_size: int = 10) -> dict:
"""Send emails in batches for better performance."""
import asyncio
from concurrent.futures import ThreadPoolExecutor
def send_single_email(email_data):
# Single email sending logic
return {"email": email_data["to"], "sent": True}
def send_batch(batch):
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(send_single_email, batch))
return results
# Process in batches
all_results = []
for i in range(0, len(emails), batch_size):
batch = emails[i:i + batch_size]
batch_results = send_batch(batch)
all_results.extend(batch_results)
successful = sum(1 for r in all_results if r["sent"])
return {
"total_emails": len(emails),
"successful": successful,
"failed": len(emails) - successful,
"results": all_results
}
Best Practices
Tool Design:
- Each tool should have one clear purpose
- Use type hints for all parameters and return values
- Provide clear docstrings with parameter descriptions
- Handle expected errors gracefully
Performance:
- Pool database connections and HTTP clients
- Cache expensive operations when appropriate
- Group similar operations for efficiency
- Use async/await for I/O-bound operations
Security:
- Validate all inputs before processing
- Implement proper authentication for sensitive operations
- Prevent abuse with rate limiting
- Use environment variables for sensitive data
Monitoring:
- Log tool usage and performance metrics
- Monitor and alert on tool failures
- Track response times and success rates
- Monitor which tools are used most frequently
Next Steps
Tools provide the bridge between AGNT5 and external systems. Learn how they integrate with other components:
- Tasks - Create durable functions that use tools
- Workflows - Orchestrate multiple tools in complex operations
- Agents - Give AI agents access to your tools
- Entities - Build stateful components that use tools
Ready to connect your first external system? Check out the Quick Start Guide to start building with tools.