Entities
Stateful, persistent components that maintain state across restarts and failures with automatic state management and serialized access
Entities are stateful, persistent components that maintain state across restarts and failures.
They provide object-oriented patterns with automatic state management, enabling you to build stateful applications with guaranteed consistency.
Every entity provides serialized access per object instance - no race conditions while maintaining persistent state that survives system failures.
What Entities Enable
Entities bridge the gap between stateless functions and stateful applications:
- Maintain long-lived state for users, sessions, or business entities
- Provide serialized access per object instance (no race conditions)
- Implement entity-specific logic and operations with encapsulation
- Cache frequently accessed data with automatic persistence
- Build stateful components like shopping carts, user profiles, or game sessions
Key Features
Persistent State
Object state automatically survives crashes and restarts:
from agnt5 import durable
@durable.object
class UserSession(DurableObject):
"""Persistent user session with automatic state management."""
def __init__(self, user_id: str):
super().__init__(user_id)
self.user_id = user_id
self.login_time = datetime.now()
self.page_views = 0
self.preferences = {}
self.shopping_cart = []
async def track_page_view(self, page: str) -> dict:
"""Track page view - state automatically persists."""
self.page_views += 1
self.last_page = page
self.last_activity = datetime.now()
# State is automatically saved
await self.save()
return {
"user_id": self.user_id,
"page": page,
"total_views": self.page_views
}
async def update_preferences(self, preferences: dict) -> dict:
"""Update user preferences with persistence."""
self.preferences.update(preferences)
await self.save()
return {"updated": True, "preferences": self.preferences}
# Get or create user session - state persists across requests
session = await UserSession.get_or_create("user_123")
await session.track_page_view("/dashboard")
# Later, even after system restart, state is preserved
session = await UserSession.get_or_create("user_123")
print(session.page_views) # Maintains previous count
Serialized Access
Only one method executes per object at a time, preventing race conditions:
@durable.object
class BankAccount(DurableObject):
"""Bank account with serialized access to prevent race conditions."""
def __init__(self, account_id: str):
super().__init__(account_id)
self.account_id = account_id
self.balance = 0.0
self.transaction_history = []
async def deposit(self, amount: float, description: str = "") -> dict:
"""Deposit money - serialized access prevents race conditions."""
if amount <= 0:
return {"success": False, "error": "Invalid amount"}
self.balance += amount
transaction = {
"type": "deposit",
"amount": amount,
"description": description,
"timestamp": datetime.now().isoformat(),
"balance_after": self.balance
}
self.transaction_history.append(transaction)
await self.save()
return {
"success": True,
"transaction": transaction,
"new_balance": self.balance
}
async def withdraw(self, amount: float, description: str = "") -> dict:
"""Withdraw money with serialized access."""
if amount <= 0:
return {"success": False, "error": "Invalid amount"}
if amount > self.balance:
return {"success": False, "error": "Insufficient funds"}
self.balance -= amount
transaction = {
"type": "withdrawal",
"amount": amount,
"description": description,
"timestamp": datetime.now().isoformat(),
"balance_after": self.balance
}
self.transaction_history.append(transaction)
await self.save()
return {
"success": True,
"transaction": transaction,
"new_balance": self.balance
}
# Concurrent operations are automatically serialized
account = await BankAccount.get_or_create("account_456")
# These operations are serialized - no race conditions
deposit_task = account.deposit(100.0, "Salary")
withdraw_task = account.withdraw(50.0, "Coffee")
# Operations execute in order, maintaining consistency
deposit_result = await deposit_task
withdraw_result = await withdraw_task
Automatic Routing
Objects are consistently routed to the same partition for efficiency:
@durable.object
class ChatRoom(DurableObject):
"""Chat room with automatic routing and state management."""
def __init__(self, room_id: str):
super().__init__(room_id)
self.room_id = room_id
self.participants = set()
self.messages = []
self.created_at = datetime.now()
async def join_room(self, user_id: str, username: str) -> dict:
"""Add user to chat room."""
self.participants.add(user_id)
join_message = {
"type": "system",
"message": f"{username} joined the room",
"timestamp": datetime.now().isoformat(),
"user_id": "system"
}
self.messages.append(join_message)
await self.save()
return {
"joined": True,
"room_id": self.room_id,
"participant_count": len(self.participants)
}
async def send_message(self, user_id: str, username: str, content: str) -> dict:
"""Send message to chat room."""
if user_id not in self.participants:
return {"success": False, "error": "User not in room"}
message = {
"type": "message",
"content": content,
"username": username,
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
"message_id": len(self.messages)
}
self.messages.append(message)
await self.save()
return {
"success": True,
"message": message,
"total_messages": len(self.messages)
}
async def get_recent_messages(self, limit: int = 50) -> list:
"""Get recent messages from the room."""
return self.messages[-limit:] if limit else self.messages
# All operations on the same room_id go to the same partition
room = await ChatRoom.get_or_create("general")
await room.join_room("user_123", "Alice")
await room.send_message("user_123", "Alice", "Hello everyone!")
LRU Caching
Inactive objects are evicted from memory but state persists on disk:
@durable.object
class DocumentCache(DurableObject):
"""Document cache with automatic LRU eviction."""
def __init__(self, doc_id: str):
super().__init__(doc_id)
self.doc_id = doc_id
self.content = ""
self.metadata = {}
self.access_count = 0
self.last_accessed = datetime.now()
async def get_content(self) -> dict:
"""Get document content - updates LRU tracking."""
self.access_count += 1
self.last_accessed = datetime.now()
await self.save() # Updates LRU position
return {
"doc_id": self.doc_id,
"content": self.content,
"access_count": self.access_count,
"last_accessed": self.last_accessed.isoformat()
}
async def update_content(self, new_content: str, metadata: dict = None) -> dict:
"""Update document content."""
self.content = new_content
if metadata:
self.metadata.update(metadata)
self.last_accessed = datetime.now()
await self.save()
return {"updated": True, "doc_id": self.doc_id}
# Objects are automatically cached in memory and evicted when inactive
# State persists even when evicted from memory
doc1 = await DocumentCache.get_or_create("doc_001")
await doc1.update_content("Important document content")
# Later access loads from persistent state if evicted from memory
doc1_again = await DocumentCache.get_or_create("doc_001")
content = await doc1_again.get_content() # Content persists
Method Invocation
Call object methods like regular function calls with durability:
@durable.object
class OrderProcessor(DurableObject):
"""Order processor with durable method invocation."""
def __init__(self, order_id: str):
super().__init__(order_id)
self.order_id = order_id
self.status = "created"
self.items = []
self.total_amount = 0.0
self.processing_steps = []
async def add_item(self, item_id: str, quantity: int, price: float) -> dict:
"""Add item to order."""
item = {
"item_id": item_id,
"quantity": quantity,
"price": price,
"subtotal": quantity * price
}
self.items.append(item)
self.total_amount += item["subtotal"]
await self.save()
return {"added": True, "item": item, "order_total": self.total_amount}
async def process_payment(self, ctx: DurableContext) -> dict:
"""Process payment with durable context."""
self.status = "processing_payment"
self.processing_steps.append("payment_started")
await self.save()
# Call external payment service durably
payment_result = await ctx.call("payment_service", "charge", {
"amount": self.total_amount,
"order_id": self.order_id
})
if payment_result["success"]:
self.status = "payment_complete"
self.processing_steps.append("payment_complete")
else:
self.status = "payment_failed"
self.processing_steps.append("payment_failed")
await self.save()
return payment_result
async def fulfill_order(self, ctx: DurableContext) -> dict:
"""Fulfill order with external service calls."""
if self.status != "payment_complete":
return {"success": False, "error": "Payment not complete"}
self.status = "fulfilling"
self.processing_steps.append("fulfillment_started")
await self.save()
# Call fulfillment service
fulfillment_result = await ctx.call("fulfillment_service", "ship", {
"order_id": self.order_id,
"items": self.items
})
if fulfillment_result["success"]:
self.status = "shipped"
self.processing_steps.append("shipped")
else:
self.status = "fulfillment_failed"
self.processing_steps.append("fulfillment_failed")
await self.save()
return fulfillment_result
# Method calls are durable and exactly-once
order = await OrderProcessor.get_or_create("order_789")
await order.add_item("widget_1", 2, 25.00)
payment_result = await order.process_payment(ctx)
fulfillment_result = await order.fulfill_order(ctx)
Creating Entities
Basic Entity
Create a simple stateful entity:
from agnt5 import durable
@durable.object
class Counter(DurableObject):
"""Simple counter with persistent state."""
def __init__(self, counter_id: str):
super().__init__(counter_id)
self.counter_id = counter_id
self.value = 0
async def increment(self, amount: int = 1) -> dict:
"""Increment counter value."""
self.value += amount
await self.save()
return {"counter_id": self.counter_id, "value": self.value}
async def decrement(self, amount: int = 1) -> dict:
"""Decrement counter value."""
self.value -= amount
await self.save()
return {"counter_id": self.counter_id, "value": self.value}
async def reset(self) -> dict:
"""Reset counter to zero."""
self.value = 0
await self.save()
return {"counter_id": self.counter_id, "value": self.value}
# Usage
counter = await Counter.get_or_create("my_counter")
result = await counter.increment(5)
print(f"Counter value: {result['value']}")
Entity with Complex State
Build entities with rich state management:
from typing import Dict, List
from datetime import datetime, timedelta
@durable.object
class GameSession(DurableObject):
"""Game session with complex state management."""
def __init__(self, session_id: str):
super().__init__(session_id)
self.session_id = session_id
self.players = {}
self.game_state = "waiting"
self.current_round = 0
self.scores = {}
self.game_history = []
self.created_at = datetime.now()
self.settings = {
"max_players": 4,
"rounds": 10,
"time_per_round": 60
}
async def add_player(self, player_id: str, player_name: str) -> dict:
"""Add player to game session."""
if len(self.players) >= self.settings["max_players"]:
return {"success": False, "error": "Game is full"}
if self.game_state != "waiting":
return {"success": False, "error": "Game already in progress"}
self.players[player_id] = {
"name": player_name,
"joined_at": datetime.now().isoformat(),
"ready": False
}
self.scores[player_id] = 0
await self.save()
return {
"success": True,
"player_id": player_id,
"player_count": len(self.players)
}
async def start_game(self) -> dict:
"""Start the game if conditions are met."""
if len(self.players) < 2:
return {"success": False, "error": "Need at least 2 players"}
all_ready = all(player["ready"] for player in self.players.values())
if not all_ready:
return {"success": False, "error": "Not all players ready"}
self.game_state = "playing"
self.current_round = 1
self.game_history.append({
"event": "game_started",
"timestamp": datetime.now().isoformat(),
"players": list(self.players.keys())
})
await self.save()
return {
"success": True,
"game_state": self.game_state,
"round": self.current_round
}
async def submit_answer(self, player_id: str, answer: str, round_num: int) -> dict:
"""Submit player answer for current round."""
if player_id not in self.players:
return {"success": False, "error": "Player not in game"}
if self.game_state != "playing":
return {"success": False, "error": "Game not in progress"}
if round_num != self.current_round:
return {"success": False, "error": "Wrong round number"}
# Process answer and update score
points = self.calculate_points(answer)
self.scores[player_id] += points
self.game_history.append({
"event": "answer_submitted",
"player_id": player_id,
"round": round_num,
"points": points,
"timestamp": datetime.now().isoformat()
})
await self.save()
return {
"success": True,
"points_earned": points,
"total_score": self.scores[player_id]
}
def calculate_points(self, answer: str) -> int:
"""Calculate points for an answer (game logic)."""
# Simple scoring logic
return len(answer) * 10 if answer else 0
async def get_leaderboard(self) -> dict:
"""Get current leaderboard."""
sorted_players = sorted(
self.scores.items(),
key=lambda x: x[1],
reverse=True
)
leaderboard = []
for rank, (player_id, score) in enumerate(sorted_players, 1):
leaderboard.append({
"rank": rank,
"player_id": player_id,
"player_name": self.players[player_id]["name"],
"score": score
})
return {
"round": self.current_round,
"leaderboard": leaderboard
}
# Complex entity usage
game = await GameSession.get_or_create("game_001")
await game.add_player("player_1", "Alice")
await game.add_player("player_2", "Bob")
await game.start_game()
await game.submit_answer("player_1", "correct answer", 1)
leaderboard = await game.get_leaderboard()
Entity with External Integrations
Entities can integrate with external systems through durable context:
@durable.object
class CustomerProfile(DurableObject):
"""Customer profile with external system integration."""
def __init__(self, customer_id: str):
super().__init__(customer_id)
self.customer_id = customer_id
self.profile_data = {}
self.preferences = {}
self.order_history = []
self.support_tickets = []
self.last_updated = datetime.now()
async def update_profile(self, ctx: DurableContext, profile_updates: dict) -> dict:
"""Update profile with external validation."""
# Validate profile data with external service
validation_result = await ctx.call(
"validation_service",
"validate_profile",
profile_updates
)
if not validation_result["valid"]:
return {
"success": False,
"errors": validation_result["errors"]
}
# Update profile
self.profile_data.update(profile_updates)
self.last_updated = datetime.now()
# Sync with external CRM
sync_result = await ctx.call(
"crm_service",
"update_customer",
{
"customer_id": self.customer_id,
"profile": self.profile_data
}
)
await self.save()
return {
"success": True,
"profile": self.profile_data,
"synced_to_crm": sync_result["success"]
}
async def add_support_ticket(self, ctx: DurableContext, ticket_data: dict) -> dict:
"""Create support ticket with external system integration."""
# Create ticket in external support system
ticket_result = await ctx.call(
"support_system",
"create_ticket",
{
"customer_id": self.customer_id,
"subject": ticket_data["subject"],
"description": ticket_data["description"],
"priority": ticket_data.get("priority", "normal")
}
)
if ticket_result["success"]:
# Store ticket reference locally
ticket = {
"ticket_id": ticket_result["ticket_id"],
"subject": ticket_data["subject"],
"status": "open",
"created_at": datetime.now().isoformat(),
"external_ticket_id": ticket_result["external_id"]
}
self.support_tickets.append(ticket)
await self.save()
return {
"success": True,
"ticket": ticket
}
else:
return {
"success": False,
"error": ticket_result["error"]
}
# Entity with external integration
customer = await CustomerProfile.get_or_create("customer_456")
await customer.update_profile(ctx, {"email": "[email protected]"})
await customer.add_support_ticket(ctx, {
"subject": "Billing Question",
"description": "I have a question about my bill"
})
Integration Patterns
Entities in Workflows
Use entities within workflows for stateful operations:
from agnt5 import workflow
@workflow
async def order_fulfillment_workflow(order_data: dict) -> dict:
"""Order fulfillment workflow using order entity."""
order_id = order_data["order_id"]
# Get or create order entity
order = await OrderProcessor.get_or_create(order_id)
# Add items to order
for item in order_data["items"]:
await order.add_item(
item["item_id"],
item["quantity"],
item["price"]
)
# Process payment
payment_result = await order.process_payment(ctx)
if not payment_result["success"]:
return {"success": False, "error": "Payment failed"}
# Fulfill order
fulfillment_result = await order.fulfill_order(ctx)
return {
"success": True,
"order_id": order_id,
"payment": payment_result,
"fulfillment": fulfillment_result
}
Entities with Agents
Combine entities with AI agents for intelligent stateful behavior:
from agnt5 import Agent
@durable.object
class ConversationManager(DurableObject):
"""Manage AI conversation with persistent context."""
def __init__(self, conversation_id: str):
super().__init__(conversation_id)
self.conversation_id = conversation_id
self.messages = []
self.context = {}
self.agent_config = {
"model": "gpt-4o",
"temperature": 0.7
}
async def send_message(self, user_message: str, user_id: str = None) -> dict:
"""Send message and get AI response with persistent context."""
# Add user message to history
user_msg = {
"role": "user",
"content": user_message,
"timestamp": datetime.now().isoformat(),
"user_id": user_id
}
self.messages.append(user_msg)
# Create agent with conversation history
agent = Agent(
name=f"conversation_{self.conversation_id}",
model=self.agent_config["model"],
temperature=self.agent_config["temperature"]
)
# Build context from conversation history
conversation_context = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in self.messages[-10:] # Last 10 messages
])
# Get AI response
ai_response = await agent.run(f"""
Conversation history:
{conversation_context}
Please respond to the latest user message.
""")
# Add AI response to history
ai_msg = {
"role": "assistant",
"content": ai_response,
"timestamp": datetime.now().isoformat()
}
self.messages.append(ai_msg)
await self.save()
return {
"conversation_id": self.conversation_id,
"user_message": user_message,
"ai_response": ai_response,
"message_count": len(self.messages)
}
async def get_conversation_summary(self) -> dict:
"""Get summary of conversation."""
agent = Agent(name="summarizer", model="gpt-4o")
full_conversation = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in self.messages
])
summary = await agent.run(f"""
Summarize this conversation in 2-3 sentences:
{full_conversation}
""")
return {
"conversation_id": self.conversation_id,
"message_count": len(self.messages),
"summary": summary
}
# Intelligent conversational entity
conversation = await ConversationManager.get_or_create("conv_123")
response1 = await conversation.send_message("Hello, I need help with my account")
response2 = await conversation.send_message("I forgot my password")
summary = await conversation.get_conversation_summary()
Entity Relationships
Model relationships between entities:
@durable.object
class Organization(DurableObject):
"""Organization entity with employee management."""
def __init__(self, org_id: str):
super().__init__(org_id)
self.org_id = org_id
self.name = ""
self.employee_ids = set()
self.departments = {}
self.created_at = datetime.now()
async def add_employee(self, employee_id: str, department: str) -> dict:
"""Add employee to organization."""
self.employee_ids.add(employee_id)
if department not in self.departments:
self.departments[department] = set()
self.departments[department].add(employee_id)
await self.save()
# Update employee's organization reference
employee = await Employee.get_or_create(employee_id)
await employee.set_organization(self.org_id, department)
return {
"success": True,
"employee_count": len(self.employee_ids),
"department": department
}
@durable.object
class Employee(DurableObject):
"""Employee entity with organization relationship."""
def __init__(self, employee_id: str):
super().__init__(employee_id)
self.employee_id = employee_id
self.name = ""
self.email = ""
self.organization_id = None
self.department = ""
self.hire_date = None
self.performance_reviews = []
async def set_organization(self, org_id: str, department: str) -> dict:
"""Set employee's organization."""
self.organization_id = org_id
self.department = department
if not self.hire_date:
self.hire_date = datetime.now().isoformat()
await self.save()
return {
"employee_id": self.employee_id,
"organization_id": org_id,
"department": department
}
async def add_performance_review(self, review_data: dict) -> dict:
"""Add performance review."""
review = {
"review_id": f"review_{len(self.performance_reviews) + 1}",
"date": datetime.now().isoformat(),
"rating": review_data["rating"],
"comments": review_data["comments"],
"reviewer": review_data["reviewer"]
}
self.performance_reviews.append(review)
await self.save()
return {"success": True, "review": review}
# Entity relationships
org = await Organization.get_or_create("acme_corp")
await org.add_employee("emp_001", "engineering")
employee = await Employee.get_or_create("emp_001")
await employee.add_performance_review({
"rating": 4.5,
"comments": "Excellent work",
"reviewer": "manager_001"
})
Common Patterns
Shopping Cart
Classic e-commerce shopping cart with persistence:
@durable.object
class ShoppingCart(DurableObject):
"""Persistent shopping cart for e-commerce."""
def __init__(self, user_id: str):
super().__init__(user_id)
self.user_id = user_id
self.items = {} # item_id -> {quantity, price, name}
self.created_at = datetime.now()
self.last_updated = datetime.now()
self.applied_coupons = []
self.estimated_tax = 0.0
self.estimated_shipping = 0.0
async def add_item(self, item_id: str, quantity: int, price: float, name: str) -> dict:
"""Add item to cart."""
if item_id in self.items:
self.items[item_id]["quantity"] += quantity
else:
self.items[item_id] = {
"quantity": quantity,
"price": price,
"name": name,
"added_at": datetime.now().isoformat()
}
self.last_updated = datetime.now()
await self.save()
return {
"item_added": True,
"item_id": item_id,
"total_quantity": self.items[item_id]["quantity"],
"cart_total": self.calculate_total()
}
async def remove_item(self, item_id: str) -> dict:
"""Remove item from cart."""
if item_id not in self.items:
return {"success": False, "error": "Item not in cart"}
removed_item = self.items.pop(item_id)
self.last_updated = datetime.now()
await self.save()
return {
"success": True,
"removed_item": removed_item,
"cart_total": self.calculate_total()
}
async def update_quantity(self, item_id: str, new_quantity: int) -> dict:
"""Update item quantity."""
if item_id not in self.items:
return {"success": False, "error": "Item not in cart"}
if new_quantity <= 0:
return await self.remove_item(item_id)
self.items[item_id]["quantity"] = new_quantity
self.last_updated = datetime.now()
await self.save()
return {
"success": True,
"item_id": item_id,
"new_quantity": new_quantity,
"cart_total": self.calculate_total()
}
async def apply_coupon(self, ctx: DurableContext, coupon_code: str) -> dict:
"""Apply coupon with external validation."""
# Validate coupon with external service
validation_result = await ctx.call(
"coupon_service",
"validate_coupon",
{
"code": coupon_code,
"user_id": self.user_id,
"cart_total": self.calculate_total()
}
)
if not validation_result["valid"]:
return {
"success": False,
"error": validation_result["error"]
}
coupon = {
"code": coupon_code,
"discount_amount": validation_result["discount_amount"],
"discount_type": validation_result["discount_type"],
"applied_at": datetime.now().isoformat()
}
self.applied_coupons.append(coupon)
self.last_updated = datetime.now()
await self.save()
return {
"success": True,
"coupon": coupon,
"new_total": self.calculate_total_with_discounts()
}
def calculate_total(self) -> float:
"""Calculate cart subtotal."""
return sum(
item["quantity"] * item["price"]
for item in self.items.values()
)
def calculate_total_with_discounts(self) -> float:
"""Calculate total with discounts applied."""
subtotal = self.calculate_total()
for coupon in self.applied_coupons:
if coupon["discount_type"] == "percentage":
subtotal *= (1 - coupon["discount_amount"] / 100)
elif coupon["discount_type"] == "fixed":
subtotal -= coupon["discount_amount"]
return max(0, subtotal + self.estimated_tax + self.estimated_shipping)
async def checkout(self, ctx: DurableContext) -> dict:
"""Convert cart to order."""
if not self.items:
return {"success": False, "error": "Cart is empty"}
# Create order from cart
order_data = {
"user_id": self.user_id,
"items": list(self.items.values()),
"subtotal": self.calculate_total(),
"discounts": self.applied_coupons,
"tax": self.estimated_tax,
"shipping": self.estimated_shipping,
"total": self.calculate_total_with_discounts()
}
order_result = await ctx.call("order_service", "create_order", order_data)
if order_result["success"]:
# Clear cart after successful order creation
self.items = {}
self.applied_coupons = []
self.last_updated = datetime.now()
await self.save()
return order_result
# Shopping cart usage
cart = await ShoppingCart.get_or_create("user_123")
await cart.add_item("widget_1", 2, 25.00, "Premium Widget")
await cart.add_item("gadget_1", 1, 50.00, "Super Gadget")
await cart.apply_coupon(ctx, "SAVE10")
order_result = await cart.checkout(ctx)
Game State Management
Manage complex game state with multiple players:
@durable.object
class MultiplayerGame(DurableObject):
"""Multiplayer game state management."""
def __init__(self, game_id: str):
super().__init__(game_id)
self.game_id = game_id
self.players = {}
self.game_state = "lobby"
self.current_turn = None
self.board_state = self.initialize_board()
self.move_history = []
self.winner = None
self.created_at = datetime.now()
def initialize_board(self) -> dict:
"""Initialize game board."""
return {
"size": 8,
"pieces": {},
"last_move": None
}
async def join_game(self, player_id: str, player_name: str) -> dict:
"""Player joins the game."""
if len(self.players) >= 2:
return {"success": False, "error": "Game is full"}
if self.game_state != "lobby":
return {"success": False, "error": "Game already started"}
player_color = "white" if len(self.players) == 0 else "black"
self.players[player_id] = {
"name": player_name,
"color": player_color,
"joined_at": datetime.now().isoformat(),
"score": 0,
"ready": False
}
await self.save()
return {
"success": True,
"player_id": player_id,
"color": player_color,
"player_count": len(self.players)
}
async def ready_player(self, player_id: str) -> dict:
"""Mark player as ready to start."""
if player_id not in self.players:
return {"success": False, "error": "Player not in game"}
self.players[player_id]["ready"] = True
await self.save()
# Start game if all players ready
if len(self.players) == 2 and all(p["ready"] for p in self.players.values()):
return await self.start_game()
return {"success": True, "waiting_for_players": True}
async def start_game(self) -> dict:
"""Start the game."""
if len(self.players) != 2:
return {"success": False, "error": "Need exactly 2 players"}
self.game_state = "playing"
# White player goes first
white_player = next(pid for pid, p in self.players.items() if p["color"] == "white")
self.current_turn = white_player
await self.save()
return {
"success": True,
"game_started": True,
"current_turn": self.current_turn,
"turn_player": self.players[self.current_turn]["name"]
}
async def make_move(self, player_id: str, move_data: dict) -> dict:
"""Player makes a move."""
if player_id not in self.players:
return {"success": False, "error": "Player not in game"}
if self.game_state != "playing":
return {"success": False, "error": "Game not in progress"}
if player_id != self.current_turn:
return {"success": False, "error": "Not your turn"}
# Validate and apply move
if not self.is_valid_move(move_data):
return {"success": False, "error": "Invalid move"}
# Apply move to board
self.apply_move(move_data)
# Record move
move_record = {
"player_id": player_id,
"move_data": move_data,
"timestamp": datetime.now().isoformat(),
"move_number": len(self.move_history) + 1
}
self.move_history.append(move_record)
# Check for game end conditions
game_result = self.check_game_end()
if game_result["game_ended"]:
self.game_state = "finished"
self.winner = game_result.get("winner")
else:
# Switch turns
self.current_turn = self.get_next_player(player_id)
await self.save()
return {
"success": True,
"move_applied": True,
"board_state": self.board_state,
"current_turn": self.current_turn,
"game_ended": game_result["game_ended"],
"winner": self.winner
}
def is_valid_move(self, move_data: dict) -> bool:
"""Validate move according to game rules."""
# Implement game-specific validation
return True
def apply_move(self, move_data: dict):
"""Apply move to board state."""
# Implement game-specific move application
self.board_state["last_move"] = move_data
def check_game_end(self) -> dict:
"""Check if game has ended."""
# Implement game-specific end conditions
return {"game_ended": False}
def get_next_player(self, current_player_id: str) -> str:
"""Get next player's ID."""
player_ids = list(self.players.keys())
current_index = player_ids.index(current_player_id)
next_index = (current_index + 1) % len(player_ids)
return player_ids[next_index]
# Multiplayer game usage
game = await MultiplayerGame.get_or_create("game_chess_001")
await game.join_game("player_1", "Alice")
await game.join_game("player_2", "Bob")
await game.ready_player("player_1")
result = await game.ready_player("player_2") # This starts the game
move_result = await game.make_move("player_1", {"from": "e2", "to": "e4"})
Performance Considerations
Optimized State Management
Manage large state efficiently:
@durable.object
class LargeDataProcessor(DurableObject):
"""Handle large datasets efficiently."""
def __init__(self, processor_id: str):
super().__init__(processor_id)
self.processor_id = processor_id
self.batch_size = 1000
self.processed_count = 0
self.error_count = 0
self.checkpoint_interval = 100
async def process_large_dataset(self, ctx: DurableContext, data_source: str) -> dict:
"""Process large dataset with periodic checkpointing."""
batch_number = 0
while True:
# Fetch batch from external source
batch_result = await ctx.call(
"data_service",
"get_batch",
{
"source": data_source,
"batch_number": batch_number,
"batch_size": self.batch_size
}
)
if not batch_result["has_data"]:
break # No more data
# Process batch
try:
processing_result = await self.process_batch(ctx, batch_result["data"])
self.processed_count += processing_result["processed"]
self.error_count += processing_result["errors"]
# Checkpoint progress periodically
if batch_number % self.checkpoint_interval == 0:
await self.save()
except Exception as e:
self.error_count += len(batch_result["data"])
await self.save()
# Continue processing despite errors
batch_number += 1
# Final save
await self.save()
return {
"completed": True,
"total_processed": self.processed_count,
"total_errors": self.error_count,
"batches_processed": batch_number
}
async def process_batch(self, ctx: DurableContext, batch_data: list) -> dict:
"""Process a single batch of data."""
processed = 0
errors = 0
for item in batch_data:
try:
# Process individual item
await ctx.call("processing_service", "process_item", item)
processed += 1
except Exception:
errors += 1
return {"processed": processed, "errors": errors}
Memory-Efficient Operations
Handle memory efficiently for large objects:
@durable.object
class StreamProcessor(DurableObject):
"""Process streaming data efficiently."""
def __init__(self, stream_id: str):
super().__init__(stream_id)
self.stream_id = stream_id
self.window_size = 1000
self.recent_events = [] # Keep only recent events in memory
self.statistics = {
"total_events": 0,
"events_per_hour": {},
"error_rate": 0.0
}
async def process_event(self, event_data: dict) -> dict:
"""Process streaming event with bounded memory."""
current_hour = datetime.now().strftime("%Y-%m-%d-%H")
# Process event
try:
processed_event = self.transform_event(event_data)
# Add to recent events (bounded window)
self.recent_events.append(processed_event)
if len(self.recent_events) > self.window_size:
self.recent_events.pop(0) # Remove oldest
# Update statistics
self.statistics["total_events"] += 1
hour_count = self.statistics["events_per_hour"].get(current_hour, 0)
self.statistics["events_per_hour"][current_hour] = hour_count + 1
# Keep only recent hours (cleanup old data)
self.cleanup_old_statistics()
# Save state periodically (not every event)
if self.statistics["total_events"] % 100 == 0:
await self.save()
return {"success": True, "processed": processed_event}
except Exception as e:
# Update error statistics
self.statistics["error_rate"] = self.calculate_error_rate()
await self.save()
return {"success": False, "error": str(e)}
def transform_event(self, event_data: dict) -> dict:
"""Transform event data."""
return {
"id": event_data["id"],
"timestamp": datetime.now().isoformat(),
"processed_data": event_data["data"]
}
def cleanup_old_statistics(self):
"""Remove statistics older than 24 hours."""
current_time = datetime.now()
cutoff_time = current_time - timedelta(hours=24)
keys_to_remove = []
for hour_key in self.statistics["events_per_hour"]:
hour_time = datetime.strptime(hour_key, "%Y-%m-%d-%H")
if hour_time < cutoff_time:
keys_to_remove.append(hour_key)
for key in keys_to_remove:
del self.statistics["events_per_hour"][key]
def calculate_error_rate(self) -> float:
"""Calculate current error rate."""
# Implement error rate calculation
return 0.05 # 5% error rate example
Best Practices
Entity Design:
- Define what state belongs to each entity
- Balance entity size with performance
- Use clear, descriptive entity and method names
- Validate state changes to maintain consistency
State Management:
- Keep only necessary state in memory
- Save state at appropriate intervals
- Remove old or unnecessary data
- Plan for state schema evolution
Performance:
- Group related state changes
- Load large data only when needed
- Cache frequently accessed computations
- Implement bounded collections for streaming data
Error Handling:
- Ensure state remains consistent on errors
- Implement recovery from partial failures
- Monitor and track entity-level errors
- Plan for degraded functionality
Next Steps
Entities provide persistent, stateful behavior to your applications. Learn how they integrate with other AGNT5 components:
- Tasks - Build durable functions that work with entities
- Workflows - Orchestrate entity operations in complex processes
- Agents - Add AI capabilities to stateful entities
- Tools - Connect entities to external systems and services
Ready to build your first stateful entity? Check out the Quick Start Guide to start building persistent applications.