Entities

Stateful, persistent components that maintain state across restarts and failures with automatic state management and serialized access


Entities are stateful, persistent components that maintain state across restarts and failures.

They provide object-oriented patterns with automatic state management, enabling you to build stateful applications with guaranteed consistency.

Every entity provides serialized access per object instance - no race conditions while maintaining persistent state that survives system failures.

What Entities Enable

Entities bridge the gap between stateless functions and stateful applications:

  • Maintain long-lived state for users, sessions, or business entities
  • Provide serialized access per object instance (no race conditions)
  • Implement entity-specific logic and operations with encapsulation
  • Cache frequently accessed data with automatic persistence
  • Build stateful components like shopping carts, user profiles, or game sessions

Key Features

Persistent State

Object state automatically survives crashes and restarts:

from agnt5 import durable

@durable.object
class UserSession(DurableObject):
    """Persistent user session with automatic state management."""
    
    def __init__(self, user_id: str):
        super().__init__(user_id)
        self.user_id = user_id
        self.login_time = datetime.now()
        self.page_views = 0
        self.preferences = {}
        self.shopping_cart = []
    
    async def track_page_view(self, page: str) -> dict:
        """Track page view - state automatically persists."""
        self.page_views += 1
        self.last_page = page
        self.last_activity = datetime.now()
        
        # State is automatically saved
        await self.save()
        
        return {
            "user_id": self.user_id,
            "page": page,
            "total_views": self.page_views
        }
    
    async def update_preferences(self, preferences: dict) -> dict:
        """Update user preferences with persistence."""
        self.preferences.update(preferences)
        await self.save()
        
        return {"updated": True, "preferences": self.preferences}

# Get or create user session - state persists across requests
session = await UserSession.get_or_create("user_123")
await session.track_page_view("/dashboard")

# Later, even after system restart, state is preserved
session = await UserSession.get_or_create("user_123")
print(session.page_views)  # Maintains previous count

Serialized Access

Only one method executes per object at a time, preventing race conditions:

@durable.object
class BankAccount(DurableObject):
    """Bank account with serialized access to prevent race conditions."""
    
    def __init__(self, account_id: str):
        super().__init__(account_id)
        self.account_id = account_id
        self.balance = 0.0
        self.transaction_history = []
    
    async def deposit(self, amount: float, description: str = "") -> dict:
        """Deposit money - serialized access prevents race conditions."""
        if amount <= 0:
            return {"success": False, "error": "Invalid amount"}
        
        self.balance += amount
        transaction = {
            "type": "deposit",
            "amount": amount,
            "description": description,
            "timestamp": datetime.now().isoformat(),
            "balance_after": self.balance
        }
        self.transaction_history.append(transaction)
        
        await self.save()
        
        return {
            "success": True,
            "transaction": transaction,
            "new_balance": self.balance
        }
    
    async def withdraw(self, amount: float, description: str = "") -> dict:
        """Withdraw money with serialized access."""
        if amount <= 0:
            return {"success": False, "error": "Invalid amount"}
        
        if amount > self.balance:
            return {"success": False, "error": "Insufficient funds"}
        
        self.balance -= amount
        transaction = {
            "type": "withdrawal", 
            "amount": amount,
            "description": description,
            "timestamp": datetime.now().isoformat(),
            "balance_after": self.balance
        }
        self.transaction_history.append(transaction)
        
        await self.save()
        
        return {
            "success": True,
            "transaction": transaction,
            "new_balance": self.balance
        }

# Concurrent operations are automatically serialized
account = await BankAccount.get_or_create("account_456")

# These operations are serialized - no race conditions
deposit_task = account.deposit(100.0, "Salary")
withdraw_task = account.withdraw(50.0, "Coffee")

# Operations execute in order, maintaining consistency
deposit_result = await deposit_task
withdraw_result = await withdraw_task

Automatic Routing

Objects are consistently routed to the same partition for efficiency:

@durable.object
class ChatRoom(DurableObject):
    """Chat room with automatic routing and state management."""
    
    def __init__(self, room_id: str):
        super().__init__(room_id)
        self.room_id = room_id
        self.participants = set()
        self.messages = []
        self.created_at = datetime.now()
    
    async def join_room(self, user_id: str, username: str) -> dict:
        """Add user to chat room."""
        self.participants.add(user_id)
        
        join_message = {
            "type": "system",
            "message": f"{username} joined the room",
            "timestamp": datetime.now().isoformat(),
            "user_id": "system"
        }
        self.messages.append(join_message)
        
        await self.save()
        
        return {
            "joined": True,
            "room_id": self.room_id,
            "participant_count": len(self.participants)
        }
    
    async def send_message(self, user_id: str, username: str, content: str) -> dict:
        """Send message to chat room."""
        if user_id not in self.participants:
            return {"success": False, "error": "User not in room"}
        
        message = {
            "type": "message",
            "content": content,
            "username": username,
            "user_id": user_id,
            "timestamp": datetime.now().isoformat(),
            "message_id": len(self.messages)
        }
        
        self.messages.append(message)
        await self.save()
        
        return {
            "success": True,
            "message": message,
            "total_messages": len(self.messages)
        }
    
    async def get_recent_messages(self, limit: int = 50) -> list:
        """Get recent messages from the room."""
        return self.messages[-limit:] if limit else self.messages

# All operations on the same room_id go to the same partition
room = await ChatRoom.get_or_create("general")
await room.join_room("user_123", "Alice")
await room.send_message("user_123", "Alice", "Hello everyone!")

LRU Caching

Inactive objects are evicted from memory but state persists on disk:

@durable.object
class DocumentCache(DurableObject):
    """Document cache with automatic LRU eviction."""
    
    def __init__(self, doc_id: str):
        super().__init__(doc_id)
        self.doc_id = doc_id
        self.content = ""
        self.metadata = {}
        self.access_count = 0
        self.last_accessed = datetime.now()
    
    async def get_content(self) -> dict:
        """Get document content - updates LRU tracking."""
        self.access_count += 1
        self.last_accessed = datetime.now()
        await self.save()  # Updates LRU position
        
        return {
            "doc_id": self.doc_id,
            "content": self.content,
            "access_count": self.access_count,
            "last_accessed": self.last_accessed.isoformat()
        }
    
    async def update_content(self, new_content: str, metadata: dict = None) -> dict:
        """Update document content."""
        self.content = new_content
        if metadata:
            self.metadata.update(metadata)
        
        self.last_accessed = datetime.now()
        await self.save()
        
        return {"updated": True, "doc_id": self.doc_id}

# Objects are automatically cached in memory and evicted when inactive
# State persists even when evicted from memory
doc1 = await DocumentCache.get_or_create("doc_001")
await doc1.update_content("Important document content")

# Later access loads from persistent state if evicted from memory
doc1_again = await DocumentCache.get_or_create("doc_001")
content = await doc1_again.get_content()  # Content persists

Method Invocation

Call object methods like regular function calls with durability:

@durable.object
class OrderProcessor(DurableObject):
    """Order processor with durable method invocation."""
    
    def __init__(self, order_id: str):
        super().__init__(order_id)
        self.order_id = order_id
        self.status = "created"
        self.items = []
        self.total_amount = 0.0
        self.processing_steps = []
    
    async def add_item(self, item_id: str, quantity: int, price: float) -> dict:
        """Add item to order."""
        item = {
            "item_id": item_id,
            "quantity": quantity,
            "price": price,
            "subtotal": quantity * price
        }
        
        self.items.append(item)
        self.total_amount += item["subtotal"]
        await self.save()
        
        return {"added": True, "item": item, "order_total": self.total_amount}
    
    async def process_payment(self, ctx: DurableContext) -> dict:
        """Process payment with durable context."""
        self.status = "processing_payment"
        self.processing_steps.append("payment_started")
        await self.save()
        
        # Call external payment service durably
        payment_result = await ctx.call("payment_service", "charge", {
            "amount": self.total_amount,
            "order_id": self.order_id
        })
        
        if payment_result["success"]:
            self.status = "payment_complete"
            self.processing_steps.append("payment_complete")
        else:
            self.status = "payment_failed"
            self.processing_steps.append("payment_failed")
        
        await self.save()
        return payment_result
    
    async def fulfill_order(self, ctx: DurableContext) -> dict:
        """Fulfill order with external service calls."""
        if self.status != "payment_complete":
            return {"success": False, "error": "Payment not complete"}
        
        self.status = "fulfilling"
        self.processing_steps.append("fulfillment_started")
        await self.save()
        
        # Call fulfillment service
        fulfillment_result = await ctx.call("fulfillment_service", "ship", {
            "order_id": self.order_id,
            "items": self.items
        })
        
        if fulfillment_result["success"]:
            self.status = "shipped"
            self.processing_steps.append("shipped")
        else:
            self.status = "fulfillment_failed"
            self.processing_steps.append("fulfillment_failed")
        
        await self.save()
        return fulfillment_result

# Method calls are durable and exactly-once
order = await OrderProcessor.get_or_create("order_789")
await order.add_item("widget_1", 2, 25.00)
payment_result = await order.process_payment(ctx)
fulfillment_result = await order.fulfill_order(ctx)

Creating Entities

Basic Entity

Create a simple stateful entity:

from agnt5 import durable

@durable.object
class Counter(DurableObject):
    """Simple counter with persistent state."""
    
    def __init__(self, counter_id: str):
        super().__init__(counter_id)
        self.counter_id = counter_id
        self.value = 0
    
    async def increment(self, amount: int = 1) -> dict:
        """Increment counter value."""
        self.value += amount
        await self.save()
        return {"counter_id": self.counter_id, "value": self.value}
    
    async def decrement(self, amount: int = 1) -> dict:
        """Decrement counter value."""
        self.value -= amount  
        await self.save()
        return {"counter_id": self.counter_id, "value": self.value}
    
    async def reset(self) -> dict:
        """Reset counter to zero."""
        self.value = 0
        await self.save()
        return {"counter_id": self.counter_id, "value": self.value}

# Usage
counter = await Counter.get_or_create("my_counter")
result = await counter.increment(5)
print(f"Counter value: {result['value']}")

Entity with Complex State

Build entities with rich state management:

from typing import Dict, List
from datetime import datetime, timedelta

@durable.object
class GameSession(DurableObject):
    """Game session with complex state management."""
    
    def __init__(self, session_id: str):
        super().__init__(session_id)
        self.session_id = session_id
        self.players = {}
        self.game_state = "waiting"
        self.current_round = 0
        self.scores = {}
        self.game_history = []
        self.created_at = datetime.now()
        self.settings = {
            "max_players": 4,
            "rounds": 10,
            "time_per_round": 60
        }
    
    async def add_player(self, player_id: str, player_name: str) -> dict:
        """Add player to game session."""
        if len(self.players) >= self.settings["max_players"]:
            return {"success": False, "error": "Game is full"}
        
        if self.game_state != "waiting":
            return {"success": False, "error": "Game already in progress"}
        
        self.players[player_id] = {
            "name": player_name,
            "joined_at": datetime.now().isoformat(),
            "ready": False
        }
        self.scores[player_id] = 0
        
        await self.save()
        
        return {
            "success": True,
            "player_id": player_id,
            "player_count": len(self.players)
        }
    
    async def start_game(self) -> dict:
        """Start the game if conditions are met."""
        if len(self.players) < 2:
            return {"success": False, "error": "Need at least 2 players"}
        
        all_ready = all(player["ready"] for player in self.players.values())
        if not all_ready:
            return {"success": False, "error": "Not all players ready"}
        
        self.game_state = "playing"
        self.current_round = 1
        self.game_history.append({
            "event": "game_started",
            "timestamp": datetime.now().isoformat(),
            "players": list(self.players.keys())
        })
        
        await self.save()
        
        return {
            "success": True,
            "game_state": self.game_state,
            "round": self.current_round
        }
    
    async def submit_answer(self, player_id: str, answer: str, round_num: int) -> dict:
        """Submit player answer for current round."""
        if player_id not in self.players:
            return {"success": False, "error": "Player not in game"}
        
        if self.game_state != "playing":
            return {"success": False, "error": "Game not in progress"}
        
        if round_num != self.current_round:
            return {"success": False, "error": "Wrong round number"}
        
        # Process answer and update score
        points = self.calculate_points(answer)
        self.scores[player_id] += points
        
        self.game_history.append({
            "event": "answer_submitted",
            "player_id": player_id,
            "round": round_num,
            "points": points,
            "timestamp": datetime.now().isoformat()
        })
        
        await self.save()
        
        return {
            "success": True,
            "points_earned": points,
            "total_score": self.scores[player_id]
        }
    
    def calculate_points(self, answer: str) -> int:
        """Calculate points for an answer (game logic)."""
        # Simple scoring logic
        return len(answer) * 10 if answer else 0
    
    async def get_leaderboard(self) -> dict:
        """Get current leaderboard."""
        sorted_players = sorted(
            self.scores.items(), 
            key=lambda x: x[1], 
            reverse=True
        )
        
        leaderboard = []
        for rank, (player_id, score) in enumerate(sorted_players, 1):
            leaderboard.append({
                "rank": rank,
                "player_id": player_id,
                "player_name": self.players[player_id]["name"],
                "score": score
            })
        
        return {
            "round": self.current_round,
            "leaderboard": leaderboard
        }

# Complex entity usage
game = await GameSession.get_or_create("game_001")
await game.add_player("player_1", "Alice")
await game.add_player("player_2", "Bob")
await game.start_game()
await game.submit_answer("player_1", "correct answer", 1)
leaderboard = await game.get_leaderboard()

Entity with External Integrations

Entities can integrate with external systems through durable context:

@durable.object
class CustomerProfile(DurableObject):
    """Customer profile with external system integration."""
    
    def __init__(self, customer_id: str):
        super().__init__(customer_id)
        self.customer_id = customer_id
        self.profile_data = {}
        self.preferences = {}
        self.order_history = []
        self.support_tickets = []
        self.last_updated = datetime.now()
    
    async def update_profile(self, ctx: DurableContext, profile_updates: dict) -> dict:
        """Update profile with external validation."""
        # Validate profile data with external service
        validation_result = await ctx.call(
            "validation_service", 
            "validate_profile", 
            profile_updates
        )
        
        if not validation_result["valid"]:
            return {
                "success": False, 
                "errors": validation_result["errors"]
            }
        
        # Update profile
        self.profile_data.update(profile_updates)
        self.last_updated = datetime.now()
        
        # Sync with external CRM
        sync_result = await ctx.call(
            "crm_service",
            "update_customer",
            {
                "customer_id": self.customer_id,
                "profile": self.profile_data
            }
        )
        
        await self.save()
        
        return {
            "success": True,
            "profile": self.profile_data,
            "synced_to_crm": sync_result["success"]
        }
    
    async def add_support_ticket(self, ctx: DurableContext, ticket_data: dict) -> dict:
        """Create support ticket with external system integration."""
        # Create ticket in external support system
        ticket_result = await ctx.call(
            "support_system",
            "create_ticket",
            {
                "customer_id": self.customer_id,
                "subject": ticket_data["subject"],
                "description": ticket_data["description"],
                "priority": ticket_data.get("priority", "normal")
            }
        )
        
        if ticket_result["success"]:
            # Store ticket reference locally
            ticket = {
                "ticket_id": ticket_result["ticket_id"],
                "subject": ticket_data["subject"],
                "status": "open",
                "created_at": datetime.now().isoformat(),
                "external_ticket_id": ticket_result["external_id"]
            }
            
            self.support_tickets.append(ticket)
            await self.save()
            
            return {
                "success": True,
                "ticket": ticket
            }
        else:
            return {
                "success": False,
                "error": ticket_result["error"]
            }

# Entity with external integration
customer = await CustomerProfile.get_or_create("customer_456")
await customer.update_profile(ctx, {"email": "[email protected]"})
await customer.add_support_ticket(ctx, {
    "subject": "Billing Question",
    "description": "I have a question about my bill"
})

Integration Patterns

Entities in Workflows

Use entities within workflows for stateful operations:

from agnt5 import workflow

@workflow
async def order_fulfillment_workflow(order_data: dict) -> dict:
    """Order fulfillment workflow using order entity."""
    order_id = order_data["order_id"]
    
    # Get or create order entity
    order = await OrderProcessor.get_or_create(order_id)
    
    # Add items to order
    for item in order_data["items"]:
        await order.add_item(
            item["item_id"], 
            item["quantity"], 
            item["price"]
        )
    
    # Process payment
    payment_result = await order.process_payment(ctx)
    if not payment_result["success"]:
        return {"success": False, "error": "Payment failed"}
    
    # Fulfill order
    fulfillment_result = await order.fulfill_order(ctx)
    
    return {
        "success": True,
        "order_id": order_id,
        "payment": payment_result,
        "fulfillment": fulfillment_result
    }

Entities with Agents

Combine entities with AI agents for intelligent stateful behavior:

from agnt5 import Agent

@durable.object
class ConversationManager(DurableObject):
    """Manage AI conversation with persistent context."""
    
    def __init__(self, conversation_id: str):
        super().__init__(conversation_id)
        self.conversation_id = conversation_id
        self.messages = []
        self.context = {}
        self.agent_config = {
            "model": "gpt-4o",
            "temperature": 0.7
        }
    
    async def send_message(self, user_message: str, user_id: str = None) -> dict:
        """Send message and get AI response with persistent context."""
        # Add user message to history
        user_msg = {
            "role": "user",
            "content": user_message,
            "timestamp": datetime.now().isoformat(),
            "user_id": user_id
        }
        self.messages.append(user_msg)
        
        # Create agent with conversation history
        agent = Agent(
            name=f"conversation_{self.conversation_id}",
            model=self.agent_config["model"],
            temperature=self.agent_config["temperature"]
        )
        
        # Build context from conversation history
        conversation_context = "\n".join([
            f"{msg['role']}: {msg['content']}" 
            for msg in self.messages[-10:]  # Last 10 messages
        ])
        
        # Get AI response
        ai_response = await agent.run(f"""
        Conversation history:
        {conversation_context}
        
        Please respond to the latest user message.
        """)
        
        # Add AI response to history
        ai_msg = {
            "role": "assistant",
            "content": ai_response,
            "timestamp": datetime.now().isoformat()
        }
        self.messages.append(ai_msg)
        
        await self.save()
        
        return {
            "conversation_id": self.conversation_id,
            "user_message": user_message,
            "ai_response": ai_response,
            "message_count": len(self.messages)
        }
    
    async def get_conversation_summary(self) -> dict:
        """Get summary of conversation."""
        agent = Agent(name="summarizer", model="gpt-4o")
        
        full_conversation = "\n".join([
            f"{msg['role']}: {msg['content']}" 
            for msg in self.messages
        ])
        
        summary = await agent.run(f"""
        Summarize this conversation in 2-3 sentences:
        {full_conversation}
        """)
        
        return {
            "conversation_id": self.conversation_id,
            "message_count": len(self.messages),
            "summary": summary
        }

# Intelligent conversational entity
conversation = await ConversationManager.get_or_create("conv_123")
response1 = await conversation.send_message("Hello, I need help with my account")
response2 = await conversation.send_message("I forgot my password")
summary = await conversation.get_conversation_summary()

Entity Relationships

Model relationships between entities:

@durable.object
class Organization(DurableObject):
    """Organization entity with employee management."""
    
    def __init__(self, org_id: str):
        super().__init__(org_id)
        self.org_id = org_id
        self.name = ""
        self.employee_ids = set()
        self.departments = {}
        self.created_at = datetime.now()
    
    async def add_employee(self, employee_id: str, department: str) -> dict:
        """Add employee to organization."""
        self.employee_ids.add(employee_id)
        
        if department not in self.departments:
            self.departments[department] = set()
        self.departments[department].add(employee_id)
        
        await self.save()
        
        # Update employee's organization reference
        employee = await Employee.get_or_create(employee_id)
        await employee.set_organization(self.org_id, department)
        
        return {
            "success": True,
            "employee_count": len(self.employee_ids),
            "department": department
        }

@durable.object
class Employee(DurableObject):
    """Employee entity with organization relationship."""
    
    def __init__(self, employee_id: str):
        super().__init__(employee_id)
        self.employee_id = employee_id
        self.name = ""
        self.email = ""
        self.organization_id = None
        self.department = ""
        self.hire_date = None
        self.performance_reviews = []
    
    async def set_organization(self, org_id: str, department: str) -> dict:
        """Set employee's organization."""
        self.organization_id = org_id
        self.department = department
        if not self.hire_date:
            self.hire_date = datetime.now().isoformat()
        
        await self.save()
        
        return {
            "employee_id": self.employee_id,
            "organization_id": org_id,
            "department": department
        }
    
    async def add_performance_review(self, review_data: dict) -> dict:
        """Add performance review."""
        review = {
            "review_id": f"review_{len(self.performance_reviews) + 1}",
            "date": datetime.now().isoformat(),
            "rating": review_data["rating"],
            "comments": review_data["comments"],
            "reviewer": review_data["reviewer"]
        }
        
        self.performance_reviews.append(review)
        await self.save()
        
        return {"success": True, "review": review}

# Entity relationships
org = await Organization.get_or_create("acme_corp")
await org.add_employee("emp_001", "engineering")

employee = await Employee.get_or_create("emp_001")
await employee.add_performance_review({
    "rating": 4.5,
    "comments": "Excellent work",
    "reviewer": "manager_001"
})

Common Patterns

Shopping Cart

Classic e-commerce shopping cart with persistence:

@durable.object
class ShoppingCart(DurableObject):
    """Persistent shopping cart for e-commerce."""
    
    def __init__(self, user_id: str):
        super().__init__(user_id)
        self.user_id = user_id
        self.items = {}  # item_id -> {quantity, price, name}
        self.created_at = datetime.now()
        self.last_updated = datetime.now()
        self.applied_coupons = []
        self.estimated_tax = 0.0
        self.estimated_shipping = 0.0
    
    async def add_item(self, item_id: str, quantity: int, price: float, name: str) -> dict:
        """Add item to cart."""
        if item_id in self.items:
            self.items[item_id]["quantity"] += quantity
        else:
            self.items[item_id] = {
                "quantity": quantity,
                "price": price,
                "name": name,
                "added_at": datetime.now().isoformat()
            }
        
        self.last_updated = datetime.now()
        await self.save()
        
        return {
            "item_added": True,
            "item_id": item_id,
            "total_quantity": self.items[item_id]["quantity"],
            "cart_total": self.calculate_total()
        }
    
    async def remove_item(self, item_id: str) -> dict:
        """Remove item from cart."""
        if item_id not in self.items:
            return {"success": False, "error": "Item not in cart"}
        
        removed_item = self.items.pop(item_id)
        self.last_updated = datetime.now()
        await self.save()
        
        return {
            "success": True,
            "removed_item": removed_item,
            "cart_total": self.calculate_total()
        }
    
    async def update_quantity(self, item_id: str, new_quantity: int) -> dict:
        """Update item quantity."""
        if item_id not in self.items:
            return {"success": False, "error": "Item not in cart"}
        
        if new_quantity <= 0:
            return await self.remove_item(item_id)
        
        self.items[item_id]["quantity"] = new_quantity
        self.last_updated = datetime.now()
        await self.save()
        
        return {
            "success": True,
            "item_id": item_id,
            "new_quantity": new_quantity,
            "cart_total": self.calculate_total()
        }
    
    async def apply_coupon(self, ctx: DurableContext, coupon_code: str) -> dict:
        """Apply coupon with external validation."""
        # Validate coupon with external service
        validation_result = await ctx.call(
            "coupon_service",
            "validate_coupon",
            {
                "code": coupon_code,
                "user_id": self.user_id,
                "cart_total": self.calculate_total()
            }
        )
        
        if not validation_result["valid"]:
            return {
                "success": False,
                "error": validation_result["error"]
            }
        
        coupon = {
            "code": coupon_code,
            "discount_amount": validation_result["discount_amount"],
            "discount_type": validation_result["discount_type"],
            "applied_at": datetime.now().isoformat()
        }
        
        self.applied_coupons.append(coupon)
        self.last_updated = datetime.now()
        await self.save()
        
        return {
            "success": True,
            "coupon": coupon,
            "new_total": self.calculate_total_with_discounts()
        }
    
    def calculate_total(self) -> float:
        """Calculate cart subtotal."""
        return sum(
            item["quantity"] * item["price"] 
            for item in self.items.values()
        )
    
    def calculate_total_with_discounts(self) -> float:
        """Calculate total with discounts applied."""
        subtotal = self.calculate_total()
        
        for coupon in self.applied_coupons:
            if coupon["discount_type"] == "percentage":
                subtotal *= (1 - coupon["discount_amount"] / 100)
            elif coupon["discount_type"] == "fixed":
                subtotal -= coupon["discount_amount"]
        
        return max(0, subtotal + self.estimated_tax + self.estimated_shipping)
    
    async def checkout(self, ctx: DurableContext) -> dict:
        """Convert cart to order."""
        if not self.items:
            return {"success": False, "error": "Cart is empty"}
        
        # Create order from cart
        order_data = {
            "user_id": self.user_id,
            "items": list(self.items.values()),
            "subtotal": self.calculate_total(),
            "discounts": self.applied_coupons,
            "tax": self.estimated_tax,
            "shipping": self.estimated_shipping,
            "total": self.calculate_total_with_discounts()
        }
        
        order_result = await ctx.call("order_service", "create_order", order_data)
        
        if order_result["success"]:
            # Clear cart after successful order creation
            self.items = {}
            self.applied_coupons = []
            self.last_updated = datetime.now()
            await self.save()
        
        return order_result

# Shopping cart usage
cart = await ShoppingCart.get_or_create("user_123")
await cart.add_item("widget_1", 2, 25.00, "Premium Widget")
await cart.add_item("gadget_1", 1, 50.00, "Super Gadget")
await cart.apply_coupon(ctx, "SAVE10")
order_result = await cart.checkout(ctx)

Game State Management

Manage complex game state with multiple players:

@durable.object
class MultiplayerGame(DurableObject):
    """Multiplayer game state management."""
    
    def __init__(self, game_id: str):
        super().__init__(game_id)
        self.game_id = game_id
        self.players = {}
        self.game_state = "lobby"
        self.current_turn = None
        self.board_state = self.initialize_board()
        self.move_history = []
        self.winner = None
        self.created_at = datetime.now()
    
    def initialize_board(self) -> dict:
        """Initialize game board."""
        return {
            "size": 8,
            "pieces": {},
            "last_move": None
        }
    
    async def join_game(self, player_id: str, player_name: str) -> dict:
        """Player joins the game."""
        if len(self.players) >= 2:
            return {"success": False, "error": "Game is full"}
        
        if self.game_state != "lobby":
            return {"success": False, "error": "Game already started"}
        
        player_color = "white" if len(self.players) == 0 else "black"
        
        self.players[player_id] = {
            "name": player_name,
            "color": player_color,
            "joined_at": datetime.now().isoformat(),
            "score": 0,
            "ready": False
        }
        
        await self.save()
        
        return {
            "success": True,
            "player_id": player_id,
            "color": player_color,
            "player_count": len(self.players)
        }
    
    async def ready_player(self, player_id: str) -> dict:
        """Mark player as ready to start."""
        if player_id not in self.players:
            return {"success": False, "error": "Player not in game"}
        
        self.players[player_id]["ready"] = True
        await self.save()
        
        # Start game if all players ready
        if len(self.players) == 2 and all(p["ready"] for p in self.players.values()):
            return await self.start_game()
        
        return {"success": True, "waiting_for_players": True}
    
    async def start_game(self) -> dict:
        """Start the game."""
        if len(self.players) != 2:
            return {"success": False, "error": "Need exactly 2 players"}
        
        self.game_state = "playing"
        # White player goes first
        white_player = next(pid for pid, p in self.players.items() if p["color"] == "white")
        self.current_turn = white_player
        
        await self.save()
        
        return {
            "success": True,
            "game_started": True,
            "current_turn": self.current_turn,
            "turn_player": self.players[self.current_turn]["name"]
        }
    
    async def make_move(self, player_id: str, move_data: dict) -> dict:
        """Player makes a move."""
        if player_id not in self.players:
            return {"success": False, "error": "Player not in game"}
        
        if self.game_state != "playing":
            return {"success": False, "error": "Game not in progress"}
        
        if player_id != self.current_turn:
            return {"success": False, "error": "Not your turn"}
        
        # Validate and apply move
        if not self.is_valid_move(move_data):
            return {"success": False, "error": "Invalid move"}
        
        # Apply move to board
        self.apply_move(move_data)
        
        # Record move
        move_record = {
            "player_id": player_id,
            "move_data": move_data,
            "timestamp": datetime.now().isoformat(),
            "move_number": len(self.move_history) + 1
        }
        self.move_history.append(move_record)
        
        # Check for game end conditions
        game_result = self.check_game_end()
        if game_result["game_ended"]:
            self.game_state = "finished"
            self.winner = game_result.get("winner")
        else:
            # Switch turns
            self.current_turn = self.get_next_player(player_id)
        
        await self.save()
        
        return {
            "success": True,
            "move_applied": True,
            "board_state": self.board_state,
            "current_turn": self.current_turn,
            "game_ended": game_result["game_ended"],
            "winner": self.winner
        }
    
    def is_valid_move(self, move_data: dict) -> bool:
        """Validate move according to game rules."""
        # Implement game-specific validation
        return True
    
    def apply_move(self, move_data: dict):
        """Apply move to board state."""
        # Implement game-specific move application
        self.board_state["last_move"] = move_data
    
    def check_game_end(self) -> dict:
        """Check if game has ended."""
        # Implement game-specific end conditions
        return {"game_ended": False}
    
    def get_next_player(self, current_player_id: str) -> str:
        """Get next player's ID."""
        player_ids = list(self.players.keys())
        current_index = player_ids.index(current_player_id)
        next_index = (current_index + 1) % len(player_ids)
        return player_ids[next_index]

# Multiplayer game usage
game = await MultiplayerGame.get_or_create("game_chess_001")
await game.join_game("player_1", "Alice")
await game.join_game("player_2", "Bob")
await game.ready_player("player_1")
result = await game.ready_player("player_2")  # This starts the game
move_result = await game.make_move("player_1", {"from": "e2", "to": "e4"})

Performance Considerations

Optimized State Management

Manage large state efficiently:

@durable.object
class LargeDataProcessor(DurableObject):
    """Handle large datasets efficiently."""
    
    def __init__(self, processor_id: str):
        super().__init__(processor_id)
        self.processor_id = processor_id
        self.batch_size = 1000
        self.processed_count = 0
        self.error_count = 0
        self.checkpoint_interval = 100
    
    async def process_large_dataset(self, ctx: DurableContext, data_source: str) -> dict:
        """Process large dataset with periodic checkpointing."""
        batch_number = 0
        
        while True:
            # Fetch batch from external source
            batch_result = await ctx.call(
                "data_service",
                "get_batch",
                {
                    "source": data_source,
                    "batch_number": batch_number,
                    "batch_size": self.batch_size
                }
            )
            
            if not batch_result["has_data"]:
                break  # No more data
            
            # Process batch
            try:
                processing_result = await self.process_batch(ctx, batch_result["data"])
                self.processed_count += processing_result["processed"]
                self.error_count += processing_result["errors"]
                
                # Checkpoint progress periodically
                if batch_number % self.checkpoint_interval == 0:
                    await self.save()
                    
            except Exception as e:
                self.error_count += len(batch_result["data"])
                await self.save()
                # Continue processing despite errors
            
            batch_number += 1
        
        # Final save
        await self.save()
        
        return {
            "completed": True,
            "total_processed": self.processed_count,
            "total_errors": self.error_count,
            "batches_processed": batch_number
        }
    
    async def process_batch(self, ctx: DurableContext, batch_data: list) -> dict:
        """Process a single batch of data."""
        processed = 0
        errors = 0
        
        for item in batch_data:
            try:
                # Process individual item
                await ctx.call("processing_service", "process_item", item)
                processed += 1
            except Exception:
                errors += 1
        
        return {"processed": processed, "errors": errors}

Memory-Efficient Operations

Handle memory efficiently for large objects:

@durable.object
class StreamProcessor(DurableObject):
    """Process streaming data efficiently."""
    
    def __init__(self, stream_id: str):
        super().__init__(stream_id)
        self.stream_id = stream_id
        self.window_size = 1000
        self.recent_events = []  # Keep only recent events in memory
        self.statistics = {
            "total_events": 0,
            "events_per_hour": {},
            "error_rate": 0.0
        }
    
    async def process_event(self, event_data: dict) -> dict:
        """Process streaming event with bounded memory."""
        current_hour = datetime.now().strftime("%Y-%m-%d-%H")
        
        # Process event
        try:
            processed_event = self.transform_event(event_data)
            
            # Add to recent events (bounded window)
            self.recent_events.append(processed_event)
            if len(self.recent_events) > self.window_size:
                self.recent_events.pop(0)  # Remove oldest
            
            # Update statistics
            self.statistics["total_events"] += 1
            hour_count = self.statistics["events_per_hour"].get(current_hour, 0)
            self.statistics["events_per_hour"][current_hour] = hour_count + 1
            
            # Keep only recent hours (cleanup old data)
            self.cleanup_old_statistics()
            
            # Save state periodically (not every event)
            if self.statistics["total_events"] % 100 == 0:
                await self.save()
            
            return {"success": True, "processed": processed_event}
            
        except Exception as e:
            # Update error statistics
            self.statistics["error_rate"] = self.calculate_error_rate()
            await self.save()
            return {"success": False, "error": str(e)}
    
    def transform_event(self, event_data: dict) -> dict:
        """Transform event data."""
        return {
            "id": event_data["id"],
            "timestamp": datetime.now().isoformat(),
            "processed_data": event_data["data"]
        }
    
    def cleanup_old_statistics(self):
        """Remove statistics older than 24 hours."""
        current_time = datetime.now()
        cutoff_time = current_time - timedelta(hours=24)
        
        keys_to_remove = []
        for hour_key in self.statistics["events_per_hour"]:
            hour_time = datetime.strptime(hour_key, "%Y-%m-%d-%H")
            if hour_time < cutoff_time:
                keys_to_remove.append(hour_key)
        
        for key in keys_to_remove:
            del self.statistics["events_per_hour"][key]
    
    def calculate_error_rate(self) -> float:
        """Calculate current error rate."""
        # Implement error rate calculation
        return 0.05  # 5% error rate example

Best Practices

Entity Design:

  • Define what state belongs to each entity
  • Balance entity size with performance
  • Use clear, descriptive entity and method names
  • Validate state changes to maintain consistency

State Management:

  • Keep only necessary state in memory
  • Save state at appropriate intervals
  • Remove old or unnecessary data
  • Plan for state schema evolution

Performance:

  • Group related state changes
  • Load large data only when needed
  • Cache frequently accessed computations
  • Implement bounded collections for streaming data

Error Handling:

  • Ensure state remains consistent on errors
  • Implement recovery from partial failures
  • Monitor and track entity-level errors
  • Plan for degraded functionality

Next Steps

Entities provide persistent, stateful behavior to your applications. Learn how they integrate with other AGNT5 components:

  • Tasks - Build durable functions that work with entities
  • Workflows - Orchestrate entity operations in complex processes
  • Agents - Add AI capabilities to stateful entities
  • Tools - Connect entities to external systems and services

Ready to build your first stateful entity? Check out the Quick Start Guide to start building persistent applications.