Skip to main content
Context Enterprise provides WebSocket APIs for real-time communication with Context Engine and Background Agent services. This enables streaming responses, live task updates, and bidirectional communication for interactive applications.
WebSocket access requires enterprise authentication. Contact enterprise@context.ai to enable WebSocket access for your organization.

Connection Overview

WebSocket connections provide:

Real-time Updates

Live streaming of Background Agent task progress and completion notifications

Bidirectional Communication

Send commands and receive responses in real-time without HTTP polling

Efficient Resource Usage

Persistent connections reduce overhead compared to frequent HTTP requests

Enterprise Security

JWT-based authentication with automatic token refresh and connection recovery

WebSocket Endpoints

Background Agents WebSocket

wss://agents.yourcompany.com:8443/ws/tasks
For real-time Background Agent task updates and management.

Context Engine WebSocket

wss://api.context.ai/v1/context-engine/stream
For streaming Context Engine queries and responses.

Authentication

WebSocket Token Generation

Generate WebSocket-specific authentication tokens:
async function getWebSocketToken(enterpriseToken) {
  const response = await fetch('https://api.context.ai/v1/auth/websocket-token', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${enterpriseToken}`,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      purposes: ['background_agent_updates', 'context_engine_stream'],
      expires_in: 3600, // 1 hour
      organization_id: process.env.CONTEXT_ENTERPRISE_ORG_ID
    })
  });

  const data = await response.json();
  return data.websocket_token;
}

WebSocket Authentication Flow

class ContextWebSocket {
  constructor(enterpriseToken) {
    this.enterpriseToken = enterpriseToken;
    this.ws = null;
    this.isAuthenticated = false;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 5;
    this.messageHandlers = new Map();
    this.subscriptions = new Set();
  }

  async connect(endpoint) {
    try {
      // Get WebSocket token
      const wsToken = await this.getWebSocketToken();
      
      // Establish connection
      this.ws = new WebSocket(endpoint);
      
      this.ws.onopen = () => {
        console.log('WebSocket connected');
        this.authenticate(wsToken);
      };

      this.ws.onmessage = (event) => {
        this.handleMessage(JSON.parse(event.data));
      };

      this.ws.onclose = (event) => {
        console.log(`WebSocket closed: ${event.code} ${event.reason}`);
        this.isAuthenticated = false;
        this.handleReconnection();
      };

      this.ws.onerror = (error) => {
        console.error('WebSocket error:', error);
      };

    } catch (error) {
      console.error('Failed to connect to WebSocket:', error);
      throw error;
    }
  }

  authenticate(wsToken) {
    this.send({
      type: 'authenticate',
      token: wsToken,
      organization_id: process.env.CONTEXT_ENTERPRISE_ORG_ID
    });
  }

  handleMessage(message) {
    switch (message.type) {
      case 'authenticated':
        console.log('WebSocket authenticated successfully');
        this.isAuthenticated = true;
        this.reconnectAttempts = 0;
        this.resubscribeAll();
        break;

      case 'authentication_failed':
        console.error('WebSocket authentication failed:', message.error);
        this.close();
        break;

      case 'error':
        console.error('WebSocket error:', message.error);
        break;

      default:
        // Handle message with registered handlers
        const handler = this.messageHandlers.get(message.type);
        if (handler) {
          handler(message.data || message);
        } else {
          console.log('Unhandled message type:', message.type);
        }
    }
  }

  send(message) {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message));
      return true;
    } else {
      console.warn('WebSocket not connected, message queued');
      return false;
    }
  }

  // Event handler registration
  on(messageType, handler) {
    this.messageHandlers.set(messageType, handler);
  }

  off(messageType) {
    this.messageHandlers.delete(messageType);
  }

  // Subscription management
  subscribe(channel) {
    this.subscriptions.add(channel);
    if (this.isAuthenticated) {
      this.send({
        type: 'subscribe',
        channel: channel
      });
    }
  }

  unsubscribe(channel) {
    this.subscriptions.delete(channel);
    if (this.isAuthenticated) {
      this.send({
        type: 'unsubscribe',
        channel: channel
      });
    }
  }

  resubscribeAll() {
    for (const channel of this.subscriptions) {
      this.send({
        type: 'subscribe',
        channel: channel
      });
    }
  }

  async getWebSocketToken() {
    const response = await fetch('https://api.context.ai/v1/auth/websocket-token', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.enterpriseToken}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        purposes: ['background_agent_updates', 'context_engine_stream'],
        expires_in: 3600
      })
    });

    const data = await response.json();
    return data.websocket_token;
  }

  handleReconnection() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts - 1), 30000);
      
      console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
      
      setTimeout(() => {
        this.connect();
      }, delay);
    } else {
      console.error('Max reconnection attempts reached');
    }
  }

  close() {
    if (this.ws) {
      this.ws.close();
      this.ws = null;
    }
    this.isAuthenticated = false;
  }
}

Background Agent WebSocket API

Task Updates and Notifications

Subscribe to real-time Background Agent task updates:
// Connect to Background Agents WebSocket
const bgWebSocket = new ContextWebSocket(enterpriseToken);
await bgWebSocket.connect('wss://agents.yourcompany.com:8443/ws/tasks');

// Subscribe to task updates
bgWebSocket.subscribe('task_updates');
bgWebSocket.subscribe('system_notifications');

// Handle task status updates
bgWebSocket.on('task_started', (data) => {
  console.log(`Task ${data.task_id} started:`, data);
  updateUI('task_started', data);
});

bgWebSocket.on('task_progress', (data) => {
  console.log(`Task ${data.task_id} progress: ${data.progress_percentage}%`);
  updateProgressBar(data.task_id, data.progress_percentage);
});

bgWebSocket.on('task_completed', (data) => {
  console.log(`Task ${data.task_id} completed:`, data);
  showTaskResults(data.task_id, data.results_url);
});

bgWebSocket.on('task_failed', (data) => {
  console.error(`Task ${data.task_id} failed:`, data.error_message);
  showError(data.task_id, data.error_message);
});

bgWebSocket.on('system_notification', (data) => {
  console.log('System notification:', data);
  showNotification(data.message, data.severity);
});

Real-time Task Submission

Submit tasks via WebSocket for immediate feedback:
// Submit task via WebSocket
function submitTaskViaWebSocket(taskConfig) {
  const taskId = `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  
  bgWebSocket.send({
    type: 'submit_task',
    task_id: taskId,
    task_config: taskConfig
  });

  return taskId;
}

// Handle task submission responses
bgWebSocket.on('task_submitted', (data) => {
  console.log('Task submitted successfully:', data);
  // Task is now in the system and will send updates via other message types
});

bgWebSocket.on('task_submission_failed', (data) => {
  console.error('Task submission failed:', data);
  showError(data.task_id, data.error_message);
});

// Usage
const taskId = submitTaskViaWebSocket({
  task_type: "research_agent",
  priority: 5,
  input: {
    query: "Analyze market trends for renewable energy",
    sources: ["financial_reports", "market_data"],
    depth: "comprehensive"
  }
});

console.log(`Submitted task with ID: ${taskId}`);

Task Management Commands

Control tasks in real-time via WebSocket:
// Cancel a running task
function cancelTask(taskId, reason) {
  bgWebSocket.send({
    type: 'cancel_task',
    task_id: taskId,
    reason: reason || 'User requested cancellation'
  });
}

// Pause a task (if supported)
function pauseTask(taskId) {
  bgWebSocket.send({
    type: 'pause_task',
    task_id: taskId
  });
}

// Resume a paused task
function resumeTask(taskId) {
  bgWebSocket.send({
    type: 'resume_task',
    task_id: taskId
  });
}

// Update task priority
function updateTaskPriority(taskId, newPriority) {
  bgWebSocket.send({
    type: 'update_task_priority',
    task_id: taskId,
    priority: newPriority
  });
}

// Handle management command responses
bgWebSocket.on('task_cancelled', (data) => {
  console.log(`Task ${data.task_id} cancelled successfully`);
  updateTaskStatus(data.task_id, 'cancelled');
});

bgWebSocket.on('task_paused', (data) => {
  console.log(`Task ${data.task_id} paused`);
  updateTaskStatus(data.task_id, 'paused');
});

bgWebSocket.on('task_resumed', (data) => {
  console.log(`Task ${data.task_id} resumed`);
  updateTaskStatus(data.task_id, 'processing');
});

bgWebSocket.on('command_error', (data) => {
  console.error(`Command failed for task ${data.task_id}: ${data.error_message}`);
  showError(data.task_id, data.error_message);
});

Context Engine WebSocket API

Streaming Queries

Stream Context Engine queries for real-time results:
// Connect to Context Engine WebSocket
const ceWebSocket = new ContextWebSocket(enterpriseToken);
await ceWebSocket.connect('wss://api.context.ai/v1/context-engine/stream');

// Subscribe to query streams
ceWebSocket.subscribe('query_streams');

// Submit streaming query
function submitStreamingQuery(query) {
  const queryId = `query_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  
  ceWebSocket.send({
    type: 'stream_query',
    query_id: queryId,
    query: query.text,
    context_scope: query.sources,
    processing_mode: "streaming",
    stream_options: {
      include_intermediate_results: true,
      max_response_time_ms: 30000,
      confidence_threshold: 0.7
    }
  });

  return queryId;
}

// Handle streaming responses
ceWebSocket.on('query_started', (data) => {
  console.log(`Query ${data.query_id} started processing`);
  showQueryProgress(data.query_id, 0);
});

ceWebSocket.on('intermediate_result', (data) => {
  console.log(`Partial result for query ${data.query_id}:`, data);
  appendPartialResult(data.query_id, data.partial_data);
});

ceWebSocket.on('query_completed', (data) => {
  console.log(`Query ${data.query_id} completed:`, data);
  showFinalResults(data.query_id, data.results);
});

ceWebSocket.on('query_failed', (data) => {
  console.error(`Query ${data.query_id} failed:`, data.error_message);
  showQueryError(data.query_id, data.error_message);
});

// Usage
const queryId = submitStreamingQuery({
  text: "What are the latest trends in artificial intelligence investments?",
  sources: ["financial_reports", "venture_capital_data", "technology_news"]
});

console.log(`Submitted streaming query with ID: ${queryId}`);

Context Updates

Receive real-time updates when Context Engine processes new data:
// Subscribe to context updates
ceWebSocket.subscribe('context_updates');

ceWebSocket.on('context_indexed', (data) => {
  console.log('New data indexed:', data);
  showNotification(`New data source "${data.source_name}" has been indexed`, 'info');
});

ceWebSocket.on('context_updated', (data) => {
  console.log('Context updated:', data);
  invalidateCache(data.affected_queries);
});

ceWebSocket.on('relationship_discovered', (data) => {
  console.log('New relationship discovered:', data);
  updateKnowledgeGraph(data.relationship);
});

Message Formats

Standard Message Structure

All WebSocket messages follow this structure:
{
  "type": "message_type",
  "timestamp": "2024-01-15T10:30:00Z",
  "data": {
    // Message-specific data
  },
  "metadata": {
    "message_id": "msg_1234567890",
    "organization_id": "org_abc123",
    "correlation_id": "corr_xyz789"
  }
}

Background Agent Message Types

Task Status Messages

Task Started

{
  "type": "task_started",
  "timestamp": "2024-01-15T10:30:00Z",
  "data": {
    "task_id": "task_123e4567-e89b-12d3-a456-426614174000",
    "task_type": "research_agent",
    "priority": 5,
    "estimated_duration_minutes": 45
  }
}

Task Progress

{
  "type": "task_progress",
  "timestamp": "2024-01-15T10:35:00Z",
  "data": {
    "task_id": "task_123e4567-e89b-12d3-a456-426614174000",
    "progress_percentage": 25,
    "current_stage": "data_collection",
    "stages_completed": ["initialization", "source_validation"],
    "estimated_remaining_minutes": 30
  }
}

Task Completed

{
  "type": "task_completed",
  "timestamp": "2024-01-15T11:15:00Z", 
  "data": {
    "task_id": "task_123e4567-e89b-12d3-a456-426614174000",
    "duration_minutes": 45,
    "results_available": true,
    "results_url": "/api/v1/tasks/task_123e4567-e89b-12d3-a456-426614174000/results",
    "confidence_score": 0.94
  }
}

Context Engine Message Types

Query Started

{
  "type": "query_started",
  "timestamp": "2024-01-15T10:30:00Z",
  "data": {
    "query_id": "query_abc123def456",
    "query": "Market analysis for tech sector",
    "estimated_processing_time_ms": 5000
  }
}

Intermediate Result

{
  "type": "intermediate_result",
  "timestamp": "2024-01-15T10:30:15Z",
  "data": {
    "query_id": "query_abc123def456",
    "partial_data": {
      "key_insights": ["Tech sector growth at 12%"],
      "confidence": 0.8,
      "sources_processed": 15,
      "processing_progress": 0.6
    }
  }
}

Query Completed

{
  "type": "query_completed",
  "timestamp": "2024-01-15T10:30:35Z",
  "data": {
    "query_id": "query_abc123def456",
    "results": {
      "summary": "Comprehensive market analysis results",
      "insights": ["Key insight 1", "Key insight 2"],
      "confidence_score": 0.94,
      "sources_analyzed": 47
    },
    "processing_time_ms": 35000
  }
}

Error Handling

Connection Errors

Handle various connection and authentication errors:
class WebSocketErrorHandler {
  static handleError(ws, error) {
    switch (error.type) {
      case 'authentication_failed':
        console.error('WebSocket authentication failed:', error.message);
        // Attempt to refresh token and reconnect
        ws.refreshTokenAndReconnect();
        break;

      case 'connection_error':
        console.error('WebSocket connection error:', error.message);
        // Attempt exponential backoff reconnection
        ws.handleReconnection();
        break;

      case 'rate_limit_exceeded':
        console.warn('WebSocket rate limit exceeded:', error.message);
        // Wait for the specified duration before reconnecting
        setTimeout(() => ws.connect(), error.retry_after * 1000);
        break;

      case 'invalid_message':
        console.error('Invalid message sent:', error.message);
        // Log for debugging, don't reconnect
        break;

      default:
        console.error('Unknown WebSocket error:', error);
    }
  }

  static isRetryableError(error) {
    const retryableErrors = [
      'connection_error',
      'rate_limit_exceeded',
      'temporary_server_error'
    ];
    return retryableErrors.includes(error.type);
  }
}

// Enhanced error handling in WebSocket class
bgWebSocket.on('error', (error) => {
  WebSocketErrorHandler.handleError(bgWebSocket, error);
});

Message Validation

Validate incoming messages to ensure data integrity:
class MessageValidator {
  static validateMessage(message) {
    const errors = [];

    if (!message.type) {
      errors.push('Message type is required');
    }

    if (!message.timestamp) {
      errors.push('Message timestamp is required');
    }

    if (message.timestamp && !this.isValidTimestamp(message.timestamp)) {
      errors.push('Invalid timestamp format');
    }

    return {
      isValid: errors.length === 0,
      errors: errors
    };
  }

  static isValidTimestamp(timestamp) {
    const date = new Date(timestamp);
    return date instanceof Date && !isNaN(date);
  }

  static validateTaskMessage(message) {
    const baseValidation = this.validateMessage(message);
    if (!baseValidation.isValid) {
      return baseValidation;
    }

    const errors = [];

    if (message.type.startsWith('task_') && !message.data?.task_id) {
      errors.push('Task ID is required for task messages');
    }

    return {
      isValid: errors.length === 0,
      errors: errors
    };
  }
}

// Use validation in message handler
bgWebSocket.handleMessage = function(rawMessage) {
  const validation = MessageValidator.validateTaskMessage(rawMessage);
  
  if (!validation.isValid) {
    console.error('Invalid message received:', validation.errors);
    return;
  }

  // Process valid message
  this.processMessage(rawMessage);
};

Best Practices

Connection Management

Connection Pooling

  • Reuse WebSocket connections across components
  • Implement connection pooling for multiple channels
  • Avoid creating multiple connections per user

Graceful Disconnection

  • Close connections properly on page unload
  • Unsubscribe from channels before disconnecting
  • Handle browser refresh and navigation

Performance Optimization

class OptimizedWebSocket extends ContextWebSocket {
  constructor(enterpriseToken) {
    super(enterpriseToken);
    this.messageQueue = [];
    this.batchSize = 10;
    this.batchTimeout = 100; // ms
  }

  // Batch messages for better performance
  sendBatch(messages) {
    if (this.isAuthenticated && messages.length > 0) {
      this.send({
        type: 'batch',
        messages: messages
      });
    }
  }

  // Queue messages and send in batches
  queueMessage(message) {
    this.messageQueue.push(message);
    
    if (this.messageQueue.length >= this.batchSize) {
      this.flushQueue();
    } else if (this.messageQueue.length === 1) {
      // Start timeout for first message in queue
      setTimeout(() => this.flushQueue(), this.batchTimeout);
    }
  }

  flushQueue() {
    if (this.messageQueue.length > 0) {
      this.sendBatch([...this.messageQueue]);
      this.messageQueue = [];
    }
  }

  // Enhanced subscription management
  bulkSubscribe(channels) {
    this.send({
      type: 'bulk_subscribe',
      channels: channels
    });
    
    channels.forEach(channel => this.subscriptions.add(channel));
  }
}

Security Considerations

// Secure WebSocket implementation
class SecureWebSocket extends ContextWebSocket {
  constructor(enterpriseToken) {
    super(enterpriseToken);
    this.allowedOrigins = ['https://yourcompany.com'];
    this.messageRateLimit = 100; // messages per minute
    this.messageCounts = new Map();
  }

  validateOrigin() {
    const origin = window.location.origin;
    return this.allowedOrigins.includes(origin);
  }

  checkRateLimit(messageType) {
    const now = Date.now();
    const windowStart = now - 60000; // 1 minute window
    
    if (!this.messageCounts.has(messageType)) {
      this.messageCounts.set(messageType, []);
    }
    
    const timestamps = this.messageCounts.get(messageType);
    
    // Remove old timestamps
    const recentTimestamps = timestamps.filter(ts => ts > windowStart);
    
    if (recentTimestamps.length >= this.messageRateLimit) {
      return false; // Rate limit exceeded
    }
    
    recentTimestamps.push(now);
    this.messageCounts.set(messageType, recentTimestamps);
    return true;
  }

  send(message) {
    if (!this.validateOrigin()) {
      console.error('Invalid origin for WebSocket communication');
      return false;
    }

    if (!this.checkRateLimit(message.type)) {
      console.warn(`Rate limit exceeded for message type: ${message.type}`);
      return false;
    }

    return super.send(message);
  }
}

Testing WebSocket Integration

Unit Tests

const test = require('ava');
const WebSocket = require('ws');
const ContextWebSocket = require('./context-websocket');

test('should authenticate successfully', async t => {
  const mockWS = {
    send: sinon.spy(),
    readyState: WebSocket.OPEN
  };

  const contextWS = new ContextWebSocket('test-token');
  contextWS.ws = mockWS;
  contextWS.authenticate('ws-token');

  t.true(mockWS.send.calledOnce);
  t.true(mockWS.send.calledWith(JSON.stringify({
    type: 'authenticate',
    token: 'ws-token',
    organization_id: process.env.CONTEXT_ENTERPRISE_ORG_ID
  })));
});

test('should handle task updates correctly', t => {
  const contextWS = new ContextWebSocket('test-token');
  let receivedUpdate = null;

  contextWS.on('task_progress', (data) => {
    receivedUpdate = data;
  });

  contextWS.handleMessage({
    type: 'task_progress',
    data: {
      task_id: 'test-task-123',
      progress_percentage: 50
    }
  });

  t.deepEqual(receivedUpdate, {
    task_id: 'test-task-123',
    progress_percentage: 50
  });
});

Integration Tests

test('end-to-end WebSocket communication', async t => {
  const contextWS = new ContextWebSocket(process.env.TEST_ENTERPRISE_TOKEN);
  
  // Connect and authenticate
  await contextWS.connect('wss://test-agents.context.ai/ws/tasks');
  
  // Wait for authentication
  await new Promise((resolve) => {
    contextWS.on('authenticated', resolve);
  });

  t.true(contextWS.isAuthenticated);

  // Subscribe to task updates
  contextWS.subscribe('task_updates');

  // Submit a test task
  const taskId = contextWS.submitTaskViaWebSocket({
    task_type: 'research_agent',
    input: { query: 'test query' }
  });

  // Wait for task completion
  const result = await new Promise((resolve) => {
    contextWS.on('task_completed', (data) => {
      if (data.task_id === taskId) {
        resolve(data);
      }
    });
  });

  t.truthy(result);
  t.is(result.task_id, taskId);

  contextWS.close();
});

Next Steps

I