SDK Examples
SDK Examples
Real-world examples showing how to debug different types of AI agents and workflows.
Chatbot Agent
A conversational agent with memory and context switching:
import opswaldfrom openai import OpenAI
opswald.init(api_key='your-key')client = OpenAI()
class ChatBot: def __init__(self): self.conversation_history = []
def chat(self, user_message: str) -> str: with opswald.trace('chat-conversation') as t: # 1. Analyze user intent intent = self._analyze_intent(user_message)
# 2. Retrieve relevant context context = self._get_context(intent, user_message)
# 3. Generate response response = self._generate_response(user_message, context)
# 4. Update conversation history self._update_history(user_message, response)
return response
def _analyze_intent(self, message: str) -> dict: with opswald.span('analyze-intent', kind='llm_call', model='gpt-4o') as s: s.set_input({'user_message': message})
response = client.chat.completions.create( model="gpt-4o", messages=[{ "role": "system", "content": "Classify the user's intent. Return JSON: {\"intent\": \"question|request|casual\", \"confidence\": 0.95, \"entities\": []}" }, { "role": "user", "content": message }], temperature=0.1 )
intent = eval(response.choices[0].message.content) # Parse JSON s.set_output(intent) s.set_tokens( input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens )
# Capture decision reasoning s.set_metadata({ 'classification_confidence': intent['confidence'], 'routing_strategy': 'high_confidence' if intent['confidence'] > 0.8 else 'fallback' })
return intent
def _get_context(self, intent: dict, message: str) -> str: with opswald.span('retrieve-context', kind='tool_call') as s: s.set_input({'intent': intent['intent'], 'entities': intent['entities']})
if intent['intent'] == 'question': # Search knowledge base context = f"Retrieved {len(intent['entities'])} relevant documents" else: # Use conversation history context = "\n".join(self.conversation_history[-3:]) # Last 3 exchanges
s.set_output({'context_type': intent['intent'], 'context_length': len(context)})
return context
def _generate_response(self, message: str, context: str) -> str: with opswald.span('generate-response', kind='llm_call', model='gpt-4o') as s: system_prompt = f"You are a helpful assistant. Context: {context}"
s.set_input({ 'user_message': message, 'context_length': len(context), 'history_turns': len(self.conversation_history) })
response = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": system_prompt}, *[{"role": msg["role"], "content": msg["content"]} for msg in self.conversation_history[-3:]], # Recent history {"role": "user", "content": message} ], temperature=0.7 )
bot_response = response.choices[0].message.content s.set_output({'response': bot_response, 'response_length': len(bot_response)}) s.set_tokens( input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens )
return bot_response
def _update_history(self, user_msg: str, bot_msg: str): with opswald.span('update-history', kind='custom') as s: s.set_input({'history_size_before': len(self.conversation_history)})
self.conversation_history.extend([ {"role": "user", "content": user_msg}, {"role": "assistant", "content": bot_msg} ])
# Keep only recent history if len(self.conversation_history) > 20: self.conversation_history = self.conversation_history[-20:]
s.set_output({'history_size_after': len(self.conversation_history)})
# Usagebot = ChatBot()response = bot.chat("What's the weather like today?")import { init, trace, span } from 'opswald';import OpenAI from 'openai';
init({ apiKey: 'your-key' });const client = new OpenAI();
interface Intent { intent: 'question' | 'request' | 'casual'; confidence: number; entities: string[];}
interface Message { role: 'user' | 'assistant'; content: string;}
class ChatBot { private conversationHistory: Message[] = [];
async chat(userMessage: string): Promise<string> { return await trace('chat-conversation', {}, async (t) => { // 1. Analyze user intent const intent = await this.analyzeIntent(userMessage);
// 2. Retrieve relevant context const context = await this.getContext(intent, userMessage);
// 3. Generate response const response = await this.generateResponse(userMessage, context);
// 4. Update conversation history await this.updateHistory(userMessage, response);
return response; }); }
private async analyzeIntent(message: string): Promise<Intent> { return await span('analyze-intent', { kind: 'llm_call', model: 'gpt-4o' }, async (s) => { s.setInput({ userMessage: message });
const response = await client.chat.completions.create({ model: "gpt-4o", messages: [{ role: "system", content: "Classify the user's intent. Return JSON: {\"intent\": \"question|request|casual\", \"confidence\": 0.95, \"entities\": []}" }, { role: "user", content: message }], temperature: 0.1 });
const intent = JSON.parse(response.choices[0].message.content!) as Intent; s.setOutput(intent); s.setTokens( response.usage?.prompt_tokens || 0, response.usage?.completion_tokens || 0 );
// Capture decision reasoning s.setMetadata({ classificationConfidence: intent.confidence, routingStrategy: intent.confidence > 0.8 ? 'high_confidence' : 'fallback' });
return intent; }); }
private async getContext(intent: Intent, message: string): Promise<string> { return await span('retrieve-context', { kind: 'tool_call' }, async (s) => { s.setInput({ intent: intent.intent, entities: intent.entities });
let context: string; if (intent.intent === 'question') { // Search knowledge base context = `Retrieved ${intent.entities.length} relevant documents`; } else { // Use conversation history context = this.conversationHistory.slice(-3) .map(msg => `${msg.role}: ${msg.content}`) .join('\n'); }
s.setOutput({ contextType: intent.intent, contextLength: context.length });
return context; }); }
private async generateResponse(message: string, context: string): Promise<string> { return await span('generate-response', { kind: 'llm_call', model: 'gpt-4o' }, async (s) => { const systemPrompt = `You are a helpful assistant. Context: ${context}`;
s.setInput({ userMessage: message, contextLength: context.length, historyTurns: this.conversationHistory.length });
const response = await client.chat.completions.create({ model: "gpt-4o", messages: [ { role: "system", content: systemPrompt }, ...this.conversationHistory.slice(-3), // Recent history { role: "user", content: message } ], temperature: 0.7 });
const botResponse = response.choices[0].message.content!; s.setOutput({ response: botResponse, responseLength: botResponse.length }); s.setTokens( response.usage?.prompt_tokens || 0, response.usage?.completion_tokens || 0 );
return botResponse; }); }
private async updateHistory(userMsg: string, botMsg: string): Promise<void> { await span('update-history', { kind: 'custom' }, async (s) => { s.setInput({ historySizeBefore: this.conversationHistory.length });
this.conversationHistory.push( { role: "user", content: userMsg }, { role: "assistant", content: botMsg } );
// Keep only recent history if (this.conversationHistory.length > 20) { this.conversationHistory = this.conversationHistory.slice(-20); }
s.setOutput({ historySizeAfter: this.conversationHistory.length }); }); }}
// Usageconst bot = new ChatBot();const response = await bot.chat("What's the weather like today?");Multi-Agent Research Pipeline
A research agent that coordinates multiple specialized sub-agents:
import opswaldimport asynciofrom typing import List, Dictfrom openai import OpenAI
opswald.init(api_key='your-key')client = OpenAI()
class ResearchPipeline: def research_topic(self, topic: str) -> Dict: with opswald.trace('research-pipeline') as t: # 1. Generate research questions questions = self._generate_questions(topic)
# 2. Research each question in parallel findings = self._research_questions(questions)
# 3. Synthesize findings synthesis = self._synthesize_findings(findings)
# 4. Generate final report report = self._generate_report(topic, synthesis)
return report
def _generate_questions(self, topic: str) -> List[str]: with opswald.span('generate-research-questions', kind='llm_call', model='gpt-4o') as s: s.set_input({'topic': topic})
response = client.chat.completions.create( model="gpt-4o", messages=[{ "role": "system", "content": "Generate 5 specific research questions about the topic. Return as JSON array." }, { "role": "user", "content": f"Topic: {topic}" }], temperature=0.3 )
questions = eval(response.choices[0].message.content) # Parse JSON array s.set_output({'num_questions': len(questions), 'questions': questions}) s.set_tokens( input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens )
return questions
def _research_questions(self, questions: List[str]) -> List[Dict]: with opswald.span('parallel-research', kind='custom') as s: s.set_input({'questions': questions, 'parallel_agents': len(questions)})
findings = [] for i, question in enumerate(questions): # Each question gets its own span finding = self._research_single_question(question, agent_id=i) findings.append(finding)
s.set_output({'findings_collected': len(findings)})
return findings
def _research_single_question(self, question: str, agent_id: int) -> Dict: with opswald.span(f'agent-{agent_id}-research', kind='custom') as s: s.set_input({'question': question, 'agent_id': agent_id})
# Simulate research steps search_results = self._web_search(question) analysis = self._analyze_sources(search_results)
finding = { 'question': question, 'sources': len(search_results), 'key_insights': analysis['insights'], 'confidence': analysis['confidence'] }
s.set_output(finding) s.set_metadata({ 'research_quality': 'high' if analysis['confidence'] > 0.8 else 'medium', 'source_diversity': len(set(r['domain'] for r in search_results)) })
return finding
def _web_search(self, query: str) -> List[Dict]: with opswald.span('web-search', kind='tool_call') as s: s.set_input({'query': query})
# Simulate web search results results = [ {'title': f'Result {i}', 'url': f'https://example{i}.com', 'domain': f'example{i}.com'} for i in range(3) ]
s.set_output({'num_results': len(results)})
return results
def _analyze_sources(self, sources: List[Dict]) -> Dict: with opswald.span('analyze-sources', kind='llm_call', model='gpt-4o') as s: s.set_input({'num_sources': len(sources)})
# Simulate analysis analysis = { 'insights': ['Insight 1', 'Insight 2'], 'confidence': 0.85, 'bias_detected': False }
s.set_output(analysis)
return analysis
def _synthesize_findings(self, findings: List[Dict]) -> Dict: with opswald.span('synthesize-findings', kind='llm_call', model='gpt-4o') as s: s.set_input({ 'num_findings': len(findings), 'avg_confidence': sum(f['confidence'] for f in findings) / len(findings) })
response = client.chat.completions.create( model="gpt-4o", messages=[{ "role": "system", "content": "Synthesize research findings into key themes and conclusions." }, { "role": "user", "content": str(findings) # In practice, format this better }], temperature=0.4 )
synthesis = { 'themes': ['Theme 1', 'Theme 2'], 'conclusions': response.choices[0].message.content, 'evidence_strength': 'strong' }
s.set_output(synthesis) s.set_tokens( input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens )
return synthesis
def _generate_report(self, topic: str, synthesis: Dict) -> Dict: with opswald.span('generate-final-report', kind='llm_call', model='gpt-4o') as s: s.set_input({'topic': topic, 'synthesis': synthesis})
response = client.chat.completions.create( model="gpt-4o", messages=[{ "role": "system", "content": "Generate a comprehensive research report based on the synthesis." }, { "role": "user", "content": f"Topic: {topic}\nSynthesis: {synthesis}" }], temperature=0.2 )
report = { 'topic': topic, 'executive_summary': response.choices[0].message.content[:200], 'full_report': response.choices[0].message.content, 'research_quality': 'high', 'word_count': len(response.choices[0].message.content.split()) }
s.set_output(report) s.set_tokens( input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens )
return report
# Usagepipeline = ResearchPipeline()report = pipeline.research_topic("Impact of AI on education")import { init, trace, span } from 'opswald';import OpenAI from 'openai';
init({ apiKey: 'your-key' });const client = new OpenAI();
interface Finding { question: string; sources: number; keyInsights: string[]; confidence: number;}
interface Synthesis { themes: string[]; conclusions: string; evidenceStrength: 'weak' | 'medium' | 'strong';}
class ResearchPipeline { async researchTopic(topic: string): Promise<any> { return await trace('research-pipeline', {}, async (t) => { // 1. Generate research questions const questions = await this.generateQuestions(topic);
// 2. Research each question in parallel const findings = await this.researchQuestions(questions);
// 3. Synthesize findings const synthesis = await this.synthesizeFindings(findings);
// 4. Generate final report const report = await this.generateReport(topic, synthesis);
return report; }); }
private async generateQuestions(topic: string): Promise<string[]> { return await span('generate-research-questions', { kind: 'llm_call', model: 'gpt-4o' }, async (s) => { s.setInput({ topic });
const response = await client.chat.completions.create({ model: "gpt-4o", messages: [{ role: "system", content: "Generate 5 specific research questions about the topic. Return as JSON array." }, { role: "user", content: `Topic: ${topic}` }], temperature: 0.3 });
const questions = JSON.parse(response.choices[0].message.content!) as string[]; s.setOutput({ numQuestions: questions.length, questions }); s.setTokens( response.usage?.prompt_tokens || 0, response.usage?.completion_tokens || 0 );
return questions; }); }
private async researchQuestions(questions: string[]): Promise<Finding[]> { return await span('parallel-research', { kind: 'custom' }, async (s) => { s.setInput({ questions, parallelAgents: questions.length });
// Research questions in parallel const findings = await Promise.all( questions.map((question, i) => this.researchSingleQuestion(question, i)) );
s.setOutput({ findingsCollected: findings.length });
return findings; }); }
private async researchSingleQuestion(question: string, agentId: number): Promise<Finding> { return await span(`agent-${agentId}-research`, { kind: 'custom' }, async (s) => { s.setInput({ question, agentId });
// Simulate research steps const searchResults = await this.webSearch(question); const analysis = await this.analyzeSources(searchResults);
const finding: Finding = { question, sources: searchResults.length, keyInsights: analysis.insights, confidence: analysis.confidence };
s.setOutput(finding); s.setMetadata({ researchQuality: analysis.confidence > 0.8 ? 'high' : 'medium', sourceDiversity: new Set(searchResults.map(r => r.domain)).size });
return finding; }); }
private async webSearch(query: string): Promise<any[]> { return await span('web-search', { kind: 'tool_call' }, async (s) => { s.setInput({ query });
// Simulate web search results const results = Array.from({ length: 3 }, (_, i) => ({ title: `Result ${i}`, url: `https://example${i}.com`, domain: `example${i}.com` }));
s.setOutput({ numResults: results.length });
return results; }); }
private async analyzeSources(sources: any[]): Promise<any> { return await span('analyze-sources', { kind: 'llm_call', model: 'gpt-4o' }, async (s) => { s.setInput({ numSources: sources.length });
// Simulate analysis const analysis = { insights: ['Insight 1', 'Insight 2'], confidence: 0.85, biasDetected: false };
s.setOutput(analysis);
return analysis; }); }
private async synthesizeFindings(findings: Finding[]): Promise<Synthesis> { return await span('synthesize-findings', { kind: 'llm_call', model: 'gpt-4o' }, async (s) => { const avgConfidence = findings.reduce((sum, f) => sum + f.confidence, 0) / findings.length;
s.setInput({ numFindings: findings.length, avgConfidence });
const response = await client.chat.completions.create({ model: "gpt-4o", messages: [{ role: "system", content: "Synthesize research findings into key themes and conclusions." }, { role: "user", content: JSON.stringify(findings, null, 2) }], temperature: 0.4 });
const synthesis: Synthesis = { themes: ['Theme 1', 'Theme 2'], conclusions: response.choices[0].message.content!, evidenceStrength: 'strong' };
s.setOutput(synthesis); s.setTokens( response.usage?.prompt_tokens || 0, response.usage?.completion_tokens || 0 );
return synthesis; }); }
private async generateReport(topic: string, synthesis: Synthesis): Promise<any> { return await span('generate-final-report', { kind: 'llm_call', model: 'gpt-4o' }, async (s) => { s.setInput({ topic, synthesis });
const response = await client.chat.completions.create({ model: "gpt-4o", messages: [{ role: "system", content: "Generate a comprehensive research report based on the synthesis." }, { role: "user", content: `Topic: ${topic}\nSynthesis: ${JSON.stringify(synthesis, null, 2)}` }], temperature: 0.2 });
const fullReport = response.choices[0].message.content!; const report = { topic, executiveSummary: fullReport.substring(0, 200), fullReport, researchQuality: 'high', wordCount: fullReport.split(' ').length };
s.setOutput(report); s.setTokens( response.usage?.prompt_tokens || 0, response.usage?.completion_tokens || 0 );
return report; }); }}
// Usageconst pipeline = new ResearchPipeline();const report = await pipeline.researchTopic("Impact of AI on education");Error Recovery and Fallback Patterns
Handle failures gracefully with comprehensive error tracking:
import opswaldfrom openai import OpenAIfrom anthropic import Anthropic
opswald.init(api_key='your-key')openai_client = OpenAI()anthropic_client = Anthropic()
class RobustAgent: def process_with_fallback(self, prompt: str) -> str: with opswald.trace('robust-processing') as t: try: # Primary: OpenAI GPT-4 return self._try_openai(prompt) except Exception as e: with opswald.span('primary-failure', kind='error') as s: s.set_error({'provider': 'openai', 'error': str(e)})
try: # Fallback 1: Anthropic Claude return self._try_anthropic(prompt) except Exception as e2: with opswald.span('fallback1-failure', kind='error') as s: s.set_error({'provider': 'anthropic', 'error': str(e2)})
try: # Fallback 2: Simplified processing return self._simple_fallback(prompt) except Exception as e3: with opswald.span('final-fallback-failure', kind='error') as s: s.set_error({'provider': 'local', 'error': str(e3)})
# Ultimate fallback: graceful degradation return self._graceful_degradation()
def _try_openai(self, prompt: str) -> str: with opswald.span('openai-attempt', kind='llm_call', provider='openai', model='gpt-4o') as s: s.set_input({'prompt': prompt, 'strategy': 'primary'})
response = openai_client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], timeout=10 # Aggressive timeout )
result = response.choices[0].message.content s.set_output({'response': result, 'success': True}) s.set_tokens( input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens )
return result
def _try_anthropic(self, prompt: str) -> str: with opswald.span('anthropic-attempt', kind='llm_call', provider='anthropic', model='claude-3-sonnet') as s: s.set_input({'prompt': prompt, 'strategy': 'fallback1'})
response = anthropic_client.messages.create( model="claude-3-sonnet-20240229", max_tokens=1000, messages=[{"role": "user", "content": prompt}], timeout=15 )
result = response.content[0].text s.set_output({'response': result, 'success': True}) s.set_metadata({'fallback_reason': 'openai_failure'})
return result
def _simple_fallback(self, prompt: str) -> str: with opswald.span('simple-processing', kind='custom') as s: s.set_input({'prompt': prompt, 'strategy': 'fallback2'})
# Simple keyword-based response if 'weather' in prompt.lower(): result = "I can't access weather data right now, but you can check weather.com" elif 'time' in prompt.lower(): import datetime result = f"The current time is {datetime.datetime.now().strftime('%H:%M')}" else: result = "I'm experiencing some technical difficulties. Please try rephrasing your request."
s.set_output({'response': result, 'method': 'keyword_matching'}) s.set_metadata({'fallback_reason': 'all_llm_providers_failed'})
return result
def _graceful_degradation(self) -> str: with opswald.span('graceful-degradation', kind='error') as s: s.set_input({'strategy': 'ultimate_fallback'})
result = "I'm currently unable to process your request. Please try again later or contact support."
s.set_output({'response': result, 'degraded': True}) s.set_metadata({ 'severity': 'critical', 'user_impact': 'service_unavailable', 'alert_required': True })
return result
# Usage with comprehensive error trackingagent = RobustAgent()response = agent.process_with_fallback("What's the weather like?")import { init, trace, span } from 'opswald';import OpenAI from 'openai';import Anthropic from '@anthropic-ai/sdk';
init({ apiKey: 'your-key' });const openaiClient = new OpenAI();const anthropicClient = new Anthropic();
class RobustAgent { async processWithFallback(prompt: string): Promise<string> { return await trace('robust-processing', {}, async (t) => { try { // Primary: OpenAI GPT-4 return await this.tryOpenAI(prompt); } catch (e) { await span('primary-failure', { kind: 'error' }, async (s) => { s.setError({ provider: 'openai', error: (e as Error).message }); });
try { // Fallback 1: Anthropic Claude return await this.tryAnthropic(prompt); } catch (e2) { await span('fallback1-failure', { kind: 'error' }, async (s) => { s.setError({ provider: 'anthropic', error: (e2 as Error).message }); });
try { // Fallback 2: Simplified processing return await this.simpleFallback(prompt); } catch (e3) { await span('final-fallback-failure', { kind: 'error' }, async (s) => { s.setError({ provider: 'local', error: (e3 as Error).message }); });
// Ultimate fallback: graceful degradation return await this.gracefulDegradation(); } } } }); }
private async tryOpenAI(prompt: string): Promise<string> { return await span('openai-attempt', { kind: 'llm_call', provider: 'openai', model: 'gpt-4o' }, async (s) => { s.setInput({ prompt, strategy: 'primary' });
const response = await openaiClient.chat.completions.create({ model: "gpt-4o", messages: [{ role: "user", content: prompt }], timeout: 10000 // Aggressive timeout });
const result = response.choices[0].message.content!; s.setOutput({ response: result, success: true }); s.setTokens( response.usage?.prompt_tokens || 0, response.usage?.completion_tokens || 0 );
return result; }); }
private async tryAnthropic(prompt: string): Promise<string> { return await span('anthropic-attempt', { kind: 'llm_call', provider: 'anthropic', model: 'claude-3-sonnet' }, async (s) => { s.setInput({ prompt, strategy: 'fallback1' });
const response = await anthropicClient.messages.create({ model: "claude-3-sonnet-20240229", max_tokens: 1000, messages: [{ role: "user", content: prompt }], timeout: 15000 });
const result = response.content[0].type === 'text' ? response.content[0].text : ''; s.setOutput({ response: result, success: true }); s.setMetadata({ fallbackReason: 'openai_failure' });
return result; }); }
private async simpleFallback(prompt: string): Promise<string> { return await span('simple-processing', { kind: 'custom' }, async (s) => { s.setInput({ prompt, strategy: 'fallback2' });
// Simple keyword-based response let result: string; if (prompt.toLowerCase().includes('weather')) { result = "I can't access weather data right now, but you can check weather.com"; } else if (prompt.toLowerCase().includes('time')) { result = `The current time is ${new Date().toLocaleTimeString()}`; } else { result = "I'm experiencing some technical difficulties. Please try rephrasing your request."; }
s.setOutput({ response: result, method: 'keyword_matching' }); s.setMetadata({ fallbackReason: 'all_llm_providers_failed' });
return result; }); }
private async gracefulDegradation(): Promise<string> { return await span('graceful-degradation', { kind: 'error' }, async (s) => { s.setInput({ strategy: 'ultimate_fallback' });
const result = "I'm currently unable to process your request. Please try again later or contact support.";
s.setOutput({ response: result, degraded: true }); s.setMetadata({ severity: 'critical', userImpact: 'service_unavailable', alertRequired: true });
return result; }); }}
// Usage with comprehensive error trackingconst agent = new RobustAgent();const response = await agent.processWithFallback("What's the weather like?");Decision Tree Agent
Track complex branching logic and decision points:
import opswaldfrom typing import Dict, Anyfrom openai import OpenAI
opswald.init(api_key='your-key')client = OpenAI()
class DecisionTreeAgent: def process_customer_request(self, request: str, customer_tier: str) -> Dict[str, Any]: with opswald.trace('customer-service-decision-tree') as t: # 1. Classify request urgency urgency = self._classify_urgency(request)
# 2. Route based on customer tier and urgency routing_decision = self._route_request(customer_tier, urgency, request)
# 3. Process according to routing decision result = self._process_routed_request(request, routing_decision)
return { 'urgency': urgency, 'routing': routing_decision, 'result': result }
def _classify_urgency(self, request: str) -> Dict[str, Any]: with opswald.span('classify-urgency', kind='llm_call', model='gpt-4o') as s: s.set_input({'request': request})
response = client.chat.completions.create( model="gpt-4o", messages=[{ "role": "system", "content": """Classify customer request urgency. Return JSON: {"urgency": "low|medium|high|critical", "confidence": 0.95, "keywords": ["refund", "urgent"], "reasoning": "explanation"}""" }, { "role": "user", "content": request }], temperature=0.1 )
urgency_data = eval(response.choices[0].message.content) s.set_output(urgency_data) s.set_tokens( input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens )
# Log the decision reasoning s.set_metadata({ 'classification_confidence': urgency_data['confidence'], 'decision_factors': urgency_data['keywords'], 'threshold_met': urgency_data['confidence'] > 0.8 })
return urgency_data
def _route_request(self, tier: str, urgency: Dict, request: str) -> Dict[str, Any]: with opswald.span('route-request', kind='custom') as s: s.set_input({ 'customer_tier': tier, 'urgency_level': urgency['urgency'], 'confidence': urgency['confidence'] })
# Decision tree logic routing_decision = self._evaluate_routing_rules(tier, urgency, request)
s.set_output(routing_decision) s.set_metadata({ 'decision_path': routing_decision['path'], 'business_rule': routing_decision['rule_applied'], 'escalation_required': routing_decision.get('escalate', False) })
return routing_decision
def _evaluate_routing_rules(self, tier: str, urgency: Dict, request: str) -> Dict[str, Any]: with opswald.span('evaluate-business-rules', kind='custom') as s: urgency_level = urgency['urgency'] confidence = urgency['confidence']
# Decision tree branches if urgency_level == 'critical' or (urgency_level == 'high' and confidence > 0.9): path = 'immediate-escalation' handler = 'human-agent' rule = 'CRITICAL_URGENT_RULE' sla_hours = 1
elif tier == 'enterprise' and urgency_level in ['high', 'medium']: path = 'enterprise-priority' handler = 'senior-agent' rule = 'ENTERPRISE_PRIORITY_RULE' sla_hours = 4
elif tier == 'premium' and urgency_level == 'high': path = 'premium-fast-track' handler = 'dedicated-agent' rule = 'PREMIUM_FAST_TRACK_RULE' sla_hours = 8
elif urgency_level == 'low' and confidence > 0.85: path = 'automated-resolution' handler = 'chatbot' rule = 'LOW_CONFIDENCE_AUTO_RULE' sla_hours = 24
else: path = 'standard-queue' handler = 'general-agent' rule = 'STANDARD_ROUTING_RULE' sla_hours = 24
decision = { 'path': path, 'handler': handler, 'rule_applied': rule, 'sla_hours': sla_hours, 'escalate': path == 'immediate-escalation', 'automated': handler == 'chatbot' }
s.set_input({ 'tier': tier, 'urgency': urgency_level, 'confidence': confidence }) s.set_output(decision)
# Capture decision tree path s.set_metadata({ 'decision_branch': f"{tier}_{urgency_level}_{confidence:.2f}", 'rule_priority': 1 if path == 'immediate-escalation' else 2 if 'premium' in path else 3, 'business_impact': 'high' if sla_hours <= 4 else 'medium' if sla_hours <= 8 else 'low' })
return decision
def _process_routed_request(self, request: str, routing: Dict[str, Any]) -> Dict[str, Any]: handler_type = routing['handler']
if handler_type == 'chatbot': return self._automated_response(request, routing) elif handler_type in ['human-agent', 'senior-agent', 'dedicated-agent']: return self._human_agent_response(request, routing) else: return self._fallback_response(request, routing)
def _automated_response(self, request: str, routing: Dict) -> Dict[str, Any]: with opswald.span('automated-resolution', kind='llm_call', model='gpt-3.5-turbo') as s: s.set_input({'request': request, 'routing_path': routing['path']})
response = client.chat.completions.create( model="gpt-3.5-turbo", # Faster model for automated responses messages=[{ "role": "system", "content": "Provide a helpful automated response to this customer request. Be concise and actionable." }, { "role": "user", "content": request }], temperature=0.3 )
automated_response = response.choices[0].message.content result = { 'response': automated_response, 'method': 'automated', 'escalation_path': 'Available if needed', 'resolution_time_minutes': 2 }
s.set_output(result) s.set_tokens( input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens )
return result
def _human_agent_response(self, request: str, routing: Dict) -> Dict[str, Any]: with opswald.span('human-agent-assignment', kind='custom') as s: s.set_input({'request': request, 'agent_type': routing['handler']})
# Simulate human agent assignment result = { 'status': 'assigned_to_human', 'agent_type': routing['handler'], 'ticket_id': f"TKT-{hash(request) % 10000:04d}", 'sla_hours': routing['sla_hours'], 'escalation_level': routing['path'] }
s.set_output(result) s.set_metadata({ 'queue_priority': 'high' if routing['escalate'] else 'normal', 'estimated_resolution_hours': routing['sla_hours'], 'resource_cost': 'high' if routing['handler'] == 'senior-agent' else 'medium' })
return result
def _fallback_response(self, request: str, routing: Dict) -> Dict[str, Any]: with opswald.span('fallback-processing', kind='error') as s: s.set_error({'routing_failure': 'Unknown handler type', 'handler': routing.get('handler', 'unknown')})
result = { 'status': 'routing_error', 'fallback': 'Assigned to general queue', 'manual_review_required': True }
s.set_output(result)
return result
# Usageagent = DecisionTreeAgent()result = agent.process_customer_request( "URGENT: My payment failed and I need to access my account immediately for a business meeting!", customer_tier="enterprise")
# View the complete decision path in the dashboardprint(f"Request routed to: {result['routing']['handler']}")print(f"Decision path: {result['routing']['path']}")These examples show how to use Opswald SDKs to debug complex AI workflows by capturing every decision point, error recovery path, and data transformation. The traces give you complete visibility into why your agents behaved the way they did.