Claude Tool Use in Production: Patterns for Building Reliable AI APIs
Advanced patterns for using Claude tool use in production — strict mode, parallel tool calls, error handling, retry logic, the tool runner, and architectural patterns for building reliable AI-powered APIs.
Getting tool use to work in a demo is straightforward. Getting it to work reliably in production — handling errors gracefully, managing costs, validating inputs, supporting parallel calls — requires a different level of engineering discipline.
This guide covers the production patterns that separate hobby projects from systems that handle real traffic. We'll build up from a naive implementation to a robust, production-grade tool-calling architecture.
The Naive Implementation (And Its Problems)
Most tutorials show something like this:
response = client.messages.create(
model="claude-opus-4-6",
tools=tools,
messages=messages
)
if response.stop_reason == "tool_use":
for block in response.content:
if block.type == "tool_use":
result = my_tools[block.name](**block.input)
# feed result back...
This works for demos. In production, it fails because:
Let's fix all of these.
Pattern 1: Strict Mode for Schema Validation
Claude 4+ supports strict: true on tool definitions, which guarantees the model's tool calls always match your exact schema:
tools = [
{
"name": "search_products",
"description": "Search the product catalog",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"category": {
"type": "string",
"enum": ["electronics", "clothing", "books", "home"],
"description": "Product category filter"
},
"max_results": {
"type": "integer",
"minimum": 1,
"maximum": 50,
"default": 10
}
},
"required": ["query"],
"additionalProperties": false
},
"strict": true # Enforce schema exactly
}
]
With strict: true, Claude will never call this tool with missing required fields or invalid enum values. This eliminates an entire class of runtime errors.
Pattern 2: Typed Tool Execution with Validation
Even with strict mode, validate at the execution boundary. Use Pydantic for clean type checking:
from pydantic import BaseModel, Field, validator
from typing import Optional, Literal
class SearchProductsInput(BaseModel):
query: str = Field(min_length=1, max_length=500)
category: Optional[Literal["electronics", "clothing", "books", "home"]] = None
max_results: int = Field(default=10, ge=1, le=50)
@validator("query")
def sanitize_query(cls, v):
# Strip any injection attempts
return v.strip().replace(";", "").replace("--", "")
def execute_tool_safe(tool_name: str, raw_input: dict) -> dict:
"""Validates input and returns structured result."""
validators = {
"search_products": SearchProductsInput,
# add other tools here
}
if tool_name not in validators:
return {"error": f"Unknown tool: {tool_name}"}
try:
validated = validators[tool_name](**raw_input)
except Exception as e:
return {"error": f"Invalid input: {str(e)}"}
# Execute the actual tool with validated input
return TOOL_REGISTRY[tool_name](validated)
Pattern 3: Async Parallel Tool Execution
When Claude requests multiple tools simultaneously (which Claude 4 does aggressively), execute them in parallel:
import asyncio
from typing import Any
async def execute_tool_async(
tool_name: str,
tool_input: dict,
tool_use_id: str,
timeout: float = 30.0
) -> dict:
"""Execute a tool asynchronously with timeout."""
try:
result = await asyncio.wait_for(
ASYNC_TOOL_REGISTRY[tool_name](tool_input),
timeout=timeout
)
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": json.dumps(result)
}
except asyncio.TimeoutError:
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": f"Error: Tool {tool_name} timed out after {timeout}s",
"is_error": True
}
except Exception as e:
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": f"Error: {str(e)}",
"is_error": True
}
async def run_tools_parallel(tool_use_blocks: list) -> list:
"""Run all tool calls from a response in parallel."""
tasks = [
execute_tool_async(
tool_name=block.name,
tool_input=block.input,
tool_use_id=block.id
)
for block in tool_use_blocks
if block.type == "tool_use"
]
return await asyncio.gather(*tasks)
This can reduce multi-tool response time by 5-10x when tools are I/O bound.
Pattern 4: Retry Logic with Exponential Backoff
Claude's API and your downstream tools will occasionally fail. Retry intelligently:
import time
import random
from functools import wraps
def with_retry(
max_attempts: int = 3,
base_delay: float = 1.0,
exceptions=(Exception,)
):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 0.5)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
time.sleep(delay)
return wrapper
return decorator
@with_retry(max_attempts=3, base_delay=1.0)
def call_claude_api(messages, tools):
return client.messages.create(
model="claude-opus-4-6",
max_tokens=4096,
tools=tools,
messages=messages
)
Pattern 5: Cost-Controlled Agent Loops
Agent loops that aren't bounded can consume thousands of tokens before you notice. Build in hard limits:
class BoundedAgentLoop:
def __init__(
self,
max_iterations: int = 20,
max_tokens_total: int = 100_000,
cost_limit_usd: float = 1.0
):
self.max_iterations = max_iterations
self.max_tokens_total = max_tokens_total
self.cost_limit_usd = cost_limit_usd
self.tokens_used = 0
self.iterations = 0
def check_limits(self, response) -> bool:
"""Returns False if limits exceeded."""
self.iterations += 1
self.tokens_used += response.usage.input_tokens + response.usage.output_tokens
# Estimate cost (claude-opus-4-6 rates)
cost = (response.usage.input_tokens / 1_000_000 * 15) + \
(response.usage.output_tokens / 1_000_000 * 75)
if self.iterations >= self.max_iterations:
print(f"Max iterations ({self.max_iterations}) reached")
return False
if self.tokens_used >= self.max_tokens_total:
print(f"Token limit ({self.max_tokens_total}) reached")
return False
if cost >= self.cost_limit_usd:
print(f"Cost limit (${self.cost_limit_usd}) reached")
return False
return True
Pattern 6: Structured Tool Results
Claude writes better follow-up responses when tool results are structured and descriptive:
# Bad: raw data dump
return str(db_results)
# Good: structured with metadata
return json.dumps({
"status": "success",
"count": len(db_results),
"data": db_results,
"metadata": {
"query_time_ms": elapsed,
"source": "products_db",
"timestamp": datetime.utcnow().isoformat()
}
})
# Even better for errors: actionable messages
return json.dumps({
"status": "error",
"error_type": "not_found",
"message": f"No products found matching '{query}' in category '{category}'",
"suggestion": "Try a broader query or remove the category filter"
})
When Claude sees actionable error messages, it can recover gracefully rather than repeating the same failing call.
Pattern 7: Tool Use Observability
In production, you need visibility into every tool call:
import uuid
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ToolCallTrace:
trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
tool_name: str = ""
input: dict = field(default_factory=dict)
output: str = ""
duration_ms: float = 0
success: bool = True
error: str = ""
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
def traced_tool_execution(tool_name: str, tool_input: dict) -> tuple[str, ToolCallTrace]:
trace = ToolCallTrace(tool_name=tool_name, input=tool_input)
start = time.monotonic()
try:
result = execute_tool_safe(tool_name, tool_input)
trace.output = json.dumps(result)[:500] # Truncate for logging
trace.success = True
except Exception as e:
trace.success = False
trace.error = str(e)
result = {"error": str(e)}
finally:
trace.duration_ms = (time.monotonic() - start) * 1000
# Send to your observability platform
logger.info("tool_call", extra=trace.__dict__)
return json.dumps(result), trace
With structured traces, you can build dashboards showing tool call volume, error rates, and latency percentiles — essential for debugging production issues.
Putting It All Together
A production tool-calling system combines all these patterns:
async def production_agent(task: str, user_id: str) -> str:
loop = BoundedAgentLoop(max_iterations=15, cost_limit_usd=0.50)
messages = [{"role": "user", "content": task}]
traces = []
while True:
response = call_claude_api(messages, tools) # with retry
if not loop.check_limits(response):
return "Task stopped: resource limit reached"
if response.stop_reason == "end_turn":
return next(b.text for b in response.content if hasattr(b, "text"))
if response.stop_reason == "tool_use":
messages.append({"role": "assistant", "content": response.content})
tool_blocks = [b for b in response.content if b.type == "tool_use"]
results = await run_tools_parallel(tool_blocks) # parallel execution
traces.extend(results)
messages.append({"role": "user", "content": results})
Conclusion
The gap between a demo and a production system is always in the details: the error you didn't handle, the loop that ran forever, the tool that timed out with no recovery path. Claude's tool use is powerful precisely because it's flexible — but flexibility requires you to add the guardrails.
Implement these patterns incrementally. Start with strict mode and input validation, add parallel execution once you're confident in your tool implementations, then layer in observability and cost controls as you scale. Each pattern independently makes your system more reliable; together, they make it production-ready.