Multi-Step Workflows

Learn how to implement Proxy in real-world scenarios, from simple single API calls to complex multi-step workflows with automatic trace and span generation.

Understanding Traces and Spans

  • Trace: A complete request journey through your application
  • Span: Individual operations within a trace (e.g., each AI API call)
  • Automatic Generation: Proxy creates these automatically without manual instrumentation

Headers Reference

Proxy heavily relies on headers to mutate traces and spans created in Adaline. Refer to the Headers Reference for more details beyond the examples below.

Example 1: Simple RAG Pipeline

A Retrieval-Augmented Generation (RAG) system that combines embedding generation, vector search, and chat completion in a single trace.
import os
import uuid
from openai import OpenAI

class RAGPipeline:
    def __init__(self):
        self.client = OpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            base_url="https://gateway.adaline.ai/v1/openai/"
        )
        
        self.base_headers = {
            "adaline-api-key": os.getenv("ADALINE_API_KEY"),
            "adaline-project-id": os.getenv("ADALINE_PROJECT_ID"),
            "adaline-prompt-id": os.getenv("ADALINE_PROMPT_ID")
        }
    
    def query(self, user_question: str, user_id: str) -> str:
        # Generate unique trace ID for this RAG workflow
        trace_id = str(uuid.uuid4())
        
        # Step 1: Generate embedding for user question
        query_embedding = self._generate_embedding(
            user_question, trace_id, user_id, "query-embedding"
        )
        
        # Step 2: Simulate vector search
        relevant_docs = self._vector_search(
            query_embedding, trace_id, user_id, "vector-search"
        )
        
        # Step 3: Generate final response using retrieved context
        response = self._generate_response(
            user_question, relevant_docs, trace_id, user_id, "response-generation"
        )
        
        return response
    
    def _generate_embedding(self, text: str, trace_id: str, user_id: str, span_name: str) -> list:
        headers = self.base_headers.copy()
        headers.update({
            "adaline-trace-status": "pending",
            "adaline-trace-reference-id": trace_id,
            "adaline-trace-name": "rag-pipeline",
            "adaline-span-name": span_name,
            "adaline-trace-tags": f'[{"operation": "create", "tag": "production-v1.3"}]',
            "adaline-trace-attributes": f'[{{"operation": "create", "key": "user_id", "value": "{user_id}"}}]',
            "adaline-span-tags": f'["production-v1.3"]'
        })
        
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text,
            extra_headers=headers
        )
        
        return response.data[0].embedding
    
    def _vector_search(self, embedding: list, trace_id: str, user_id: str, span_name: str) -> str:
        # Simulate vector search - in real implementation, query your vector database
        headers = self.base_headers.copy()
        headers.update({
            "adaline-trace-status": "pending",
            "adaline-trace-reference-id": trace_id,
            "adaline-trace-name": "rag-pipeline",
            "adaline-span-name": span_name,
            "adaline-trace-attributes": f'[{{"operation": "create", "key": "user_id", "value": "{user_id}"}}]',
            "adaline-span-tags": f'["production-v1.3"]'
        })
        
        # Mock relevant documents (replace with actual vector search)
        return "Relevant context: AI models are trained on large datasets and use neural networks..."
    
    def _generate_response(self, question: str, context: str, trace_id: str, user_id: str, span_name: str) -> str:
        headers = self.base_headers.copy()
        headers.update({
            "adaline-trace-status": "success",
            "adaline-trace-reference-id": trace_id,
            "adaline-trace-name": "rag-pipeline",
            "adaline-span-name": span_name,
            "adaline-trace-attributes": f'[{{"operation": "create", "key": "user_id", "value": "{user_id}"}}]',
            "adaline-span-tags": f'["production-v1.3"]',
            "adaline-span-variables": f'{{"user_id": {{"modality": "text", "value": "{user_id}"}}}}'
        })
        
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": f"Answer the question using this context: {context}"},
                {"role": "user", "content": question}
            ],
            extra_headers=headers
        )
        
        return response.choices[0].message.content

# Usage
rag = RAGPipeline()
answer = rag.query("How do neural networks learn?", user_id="user123")
print(answer)

Example 2: Multi-Step Content Generation

A content generation workflow that uses multiple embedding calls and chat completions with span attributes and variables tracking.
import os
import uuid
from openai import OpenAI
from anthropic import Anthropic

class ContentGenerator:
    def __init__(self):
        self.openai_client = OpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            base_url="https://gateway.adaline.ai/v1/openai/"
        )
        
        self.anthropic_client = Anthropic(
            api_key=os.getenv("ANTHROPIC_API_KEY"),
            base_url="https://gateway.adaline.ai/v1/anthropic/"
        )
        
        self.base_headers = {
            "adaline-api-key": os.getenv("ADALINE_API_KEY"),
            "adaline-project-id": os.getenv("ADALINE_PROJECT_ID"),
            "adaline-prompt-id": os.getenv("ADALINE_PROMPT_ID")
        }
    
    def generate_article(self, topic: str, target_audience: str, user_id: str) -> dict:
        trace_id = str(uuid.uuid4())
        workflow_data = {
            "topic": topic,
            "target_audience": target_audience,
            "user_id": user_id,
            "workflow_id": f"content-gen-{trace_id[:8]}"
        }
        
        # Step 1: Research phase - multiple embedding calls for topic analysis
        research_data = self._research_topic(topic, trace_id, workflow_data)
        
        # Step 2: Content synthesis using research
        content = self._synthesize_content(research_data, trace_id, workflow_data)
        
        # Step 3: Review and enhancement
        final_content = self._review_content(content, trace_id, workflow_data)
        
        return {
            "topic": topic,
            "research": research_data,
            "content": content,
            "final_content": final_content,
            "workflow_id": workflow_data["workflow_id"]
        }
    
    def _research_topic(self, topic: str, trace_id: str, workflow_data: dict) -> dict:
        research_queries = [
            f"What are the key concepts in {topic}?",
            f"What are recent developments in {topic}?",
            f"What challenges exist in {topic}?"
        ]
        
        embeddings = []
        for i, query in enumerate(research_queries):
            headers = self.base_headers.copy()
            headers.update({
                "adaline-trace-status": "pending",
                "adaline-trace-reference-id": trace_id,
                "adaline-trace-name": "content-generation",
                "adaline-trace-tags": f'["production-v1.3"]',
                "adaline-trace-attributes": f'[{{"operation": "create", "key": "query_type", "value": "research"}, {{"operation": "create", "key": "query_index", "value": "{i+1}"}, {{"operation": "create", "key": "total_queries", "value": "{len(research_queries)}"}}]',
                "adaline-span-name": f"research-embedding-{i+1}",
            })
            
            response = self.openai_client.embeddings.create(
                model="text-embedding-3-small",
                input=query,
                extra_headers=headers
            )
            
            embeddings.append({
                "query": query,
                "embedding": response.data[0].embedding
            })
        
        return {"queries": research_queries, "embeddings": embeddings}
    
    def _synthesize_content(self, research_data: dict, trace_id: str, workflow_data: dict) -> str:
        headers = self.base_headers.copy()
        headers.update({
            "adaline-trace-status": "pending",
            "adaline-trace-reference-id": trace_id,
            "adaline-trace-name": "content-generation",
            "adaline-span-name": "content-synthesis",
            "adaline-trace-attributes": f'[{"operation": "create", "key": "phase", "value": "synthesis"}, {"operation": "create", "key": "research_queries_count", "value": "{len(research_data["queries"])}"}]',
            "adaline-span-variables": f'{{"topic": {{"modality": "text", "value": "{workflow_data["topic"]}"}}, "target_audience": {{"modality": "text", "value": "{workflow_data["target_audience"]}"}}}}'
        })
        
        research_context = "\n".join([f"- {query}" for query in research_data["queries"]])
        
        response = self.anthropic_client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=2000,
            messages=[
                {"role": "user", "content": f"Write a comprehensive article about {workflow_data['topic']} for {workflow_data['target_audience']}. Base it on these research questions:\n{research_context}"}
            ],
            extra_headers=headers
        )
        
        return response.content[0].text
    
    def _review_content(self, content: str, trace_id: str, workflow_data: dict) -> str:
        headers = self.base_headers.copy()
        headers.update({
            "adaline-trace-status": "success",
            "adaline-trace-reference-id": trace_id,
            "adaline-trace-name": "content-generation",
            "adaline-span-name": "content-review",
            "adaline-trace-attributes": f'[{{"operation": "create", "key": "phase", "value": "review"}, {{"operation": "create", "key": "content_length", "value": "{len(content)}"}, {{"operation": "create", "key": "review_type", "value": "enhancement"}}]',
            "adaline-span-variables": f'{{"content": {{"modality": "text", "value": "{content}"}}}}'
        })
        
        response = self.openai_client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "Review and enhance the content for clarity, engagement, and accuracy."},
                {"role": "user", "content": content}
            ],
            extra_headers=headers
        )
        
        return response.choices[0].message.content

# Usage
generator = ContentGenerator()
result = generator.generate_article(
    "Machine Learning in Healthcare", 
    "healthcare professionals", 
    user_id="user456"
)
print(f"Generated article: {result['workflow_id']}")

Example 3: Conversational Agent with Tools

A conversational agent that uses tool calling with trace status updates and comprehensive error handling.
import os
import uuid
import json
from openai import OpenAI

class ConversationalAgent:
    def __init__(self):
        self.client = OpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            base_url="https://gateway.adaline.ai/v1/openai/"
        )
        
        self.base_headers = {
            "adaline-api-key": os.getenv("ADALINE_API_KEY"),
            "adaline-project-id": os.getenv("ADALINE_PROJECT_ID"),
            "adaline-prompt-id": os.getenv("ADALINE_PROMPT_ID")
        }
        
        self.tools = [
            {
                "type": "function",
                "function": {
                    "name": "get_weather",
                    "description": "Get current weather for a location",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "location": {"type": "string", "description": "City name"}
                        },
                        "required": ["location"]
                    }
                }
            },
            {
                "type": "function",
                "function": {
                    "name": "calculate",
                    "description": "Perform mathematical calculations",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "expression": {"type": "string", "description": "Math expression to evaluate"}
                        },
                        "required": ["expression"]
                    }
                }
            }
        ]
    
    def chat(self, user_message: str, user_id: str) -> str:
        trace_id = str(uuid.uuid4())
        
        try:
            # Step 1: Initial query processing with tool selection
            response = self._process_query(user_message, trace_id, user_id)
            
            # Step 2: Handle tool calls if needed
            if response.choices[0].message.tool_calls:
                tool_results = self._execute_tools(response.choices[0].message.tool_calls, trace_id, user_id)
                
                # Step 3: Generate final response with tool results
                final_response = self._generate_final_response(
                    user_message, response.choices[0].message, tool_results, trace_id, user_id
                )
                
                # Update trace status to completed
                self._update_trace_status(trace_id, user_id, "completed", "success")
                
                return final_response
            else:
                # Direct response without tools
                self._update_trace_status(trace_id, user_id, "completed", "success")
                return response.choices[0].message.content
                
        except Exception as e:
            # Update trace status to failed
            self._update_trace_status(trace_id, user_id, "failed", f"error: {str(e)}")
            return f"I encountered an error: {str(e)}"
    
    def _process_query(self, user_message: str, trace_id: str, user_id: str):
        headers = self.base_headers.copy()
        headers.update({
            "adaline-trace-reference-id": trace_id,
            "adaline-trace-name": "conversational-agent",
            "adaline-trace-status": "in_progress",
            "adaline-span-name": "query-processing",
            "adaline-span-tags": '["tool_selection", "intent_analysis"]',
            "adaline-span-attributes": f'{{"user_id": "{user_id}", "message_length": {len(user_message)}, "tools_available": {len(self.tools)}}}'
        })
        
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a helpful assistant with access to tools. Use them when appropriate."},
                {"role": "user", "content": user_message}
            ],
            tools=self.tools,
            tool_choice="auto",
            extra_headers=headers
        )
        
        return response
    
    def _execute_tools(self, tool_calls, trace_id: str, user_id: str) -> list:
        results = []
        
        for i, tool_call in enumerate(tool_calls):
            headers = self.base_headers.copy()
            headers.update({
                "adaline-trace-reference-id": trace_id,
                "adaline-trace-name": "conversational-agent",
                "adaline-span-name": f"tool-execution-{i+1}",
                "adaline-span-tags": f'["tool_call", "{tool_call.function.name}"]',
                "adaline-span-attributes": f'{{"tool_name": "{tool_call.function.name}", "tool_call_id": "{tool_call.id}", "execution_order": {i+1}}}'
            })
            
            # Simulate tool execution
            if tool_call.function.name == "get_weather":
                args = json.loads(tool_call.function.arguments)
                result = f"Weather in {args['location']}: 72°F, sunny"
            elif tool_call.function.name == "calculate":
                args = json.loads(tool_call.function.arguments)
                try:
                    result = str(eval(args['expression']))  # Note: Use safe evaluation in production
                except:
                    result = "Error in calculation"
            else:
                result = "Tool not implemented"
            
            results.append({
                "tool_call_id": tool_call.id,
                "result": result
            })
        
        return results
    
    def _generate_final_response(self, original_message: str, assistant_message, tool_results: list, trace_id: str, user_id: str) -> str:
        headers = self.base_headers.copy()
        headers.update({
            "adaline-trace-reference-id": trace_id,
            "adaline-trace-name": "conversational-agent",
            "adaline-span-name": "response-generation",
            "adaline-span-tags": '["final_response", "tool_integration"]',
            "adaline-span-attributes": f'{{"tools_used": {len(tool_results)}, "response_type": "tool_assisted"}}'
        })
        
        # Build messages with tool results
        messages = [
            {"role": "system", "content": "You are a helpful assistant. Provide a natural response using the tool results."},
            {"role": "user", "content": original_message},
            assistant_message,
        ]
        
        # Add tool results
        for tool_result in tool_results:
            messages.append({
                "role": "tool",
                "tool_call_id": tool_result["tool_call_id"],
                "content": tool_result["result"]
            })
        
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=messages,
            extra_headers=headers
        )
        
        return response.choices[0].message.content
    
    def _update_trace_status(self, trace_id: str, user_id: str, status: str, details: str):
        headers = self.base_headers.copy()
        headers.update({
            "adaline-trace-reference-id": trace_id,
            "adaline-trace-name": "conversational-agent",
            "adaline-trace-status": status,
            "adaline-span-name": "status-update",
            "adaline-span-tags": f'["status_update", "{status}"]',
            "adaline-span-attributes": f'{{"final_status": "{status}", "details": "{details}"}}'
        })
        
        # Make a minimal API call to update trace status
        self.client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": "Status update"}],
            max_tokens=1,
            extra_headers=headers
        )

# Usage
agent = ConversationalAgent()
response = agent.chat("What's the weather in New York and what's 15 * 24?", user_id="user789")
print(response)

Best Practices for Complex Workflows

1. Consistent Trace Management

Use a single trace ID across all related operations:
trace_id = str(uuid.uuid4())
# Use the same trace_id for all spans in the workflow

2. Meaningful Span Names

Use descriptive span names that indicate the operation:
"adaline-span-name": "query-embedding"
"adaline-span-name": "vector-search"
"adaline-span-name": "response-generation"

3. Error Handling with Trace Status

Update trace status to reflect workflow state:
"adaline-trace-status": "in_progress"  # During execution
"adaline-trace-status": "completed"    # On success
"adaline-trace-status": "failed"       # On error