WebSocket access requires enterprise authentication. Contact enterprise@context.ai to enable WebSocket access for your organization.
Connection Overview
WebSocket connections provide:Real-time Updates
Live streaming of Background Agent task progress and completion notifications
Bidirectional Communication
Send commands and receive responses in real-time without HTTP polling
Efficient Resource Usage
Persistent connections reduce overhead compared to frequent HTTP requests
Enterprise Security
JWT-based authentication with automatic token refresh and connection recovery
WebSocket Endpoints
Background Agents WebSocket
Copy
Ask AI
wss://agents.yourcompany.com:8443/ws/tasks
Context Engine WebSocket
Copy
Ask AI
wss://api.context.ai/v1/context-engine/stream
Authentication
WebSocket Token Generation
Generate WebSocket-specific authentication tokens:Copy
Ask AI
async function getWebSocketToken(enterpriseToken) {
const response = await fetch('https://api.context.ai/v1/auth/websocket-token', {
method: 'POST',
headers: {
'Authorization': `Bearer ${enterpriseToken}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
purposes: ['background_agent_updates', 'context_engine_stream'],
expires_in: 3600, // 1 hour
organization_id: process.env.CONTEXT_ENTERPRISE_ORG_ID
})
});
const data = await response.json();
return data.websocket_token;
}
WebSocket Authentication Flow
Copy
Ask AI
class ContextWebSocket {
constructor(enterpriseToken) {
this.enterpriseToken = enterpriseToken;
this.ws = null;
this.isAuthenticated = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.messageHandlers = new Map();
this.subscriptions = new Set();
}
async connect(endpoint) {
try {
// Get WebSocket token
const wsToken = await this.getWebSocketToken();
// Establish connection
this.ws = new WebSocket(endpoint);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.authenticate(wsToken);
};
this.ws.onmessage = (event) => {
this.handleMessage(JSON.parse(event.data));
};
this.ws.onclose = (event) => {
console.log(`WebSocket closed: ${event.code} ${event.reason}`);
this.isAuthenticated = false;
this.handleReconnection();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
} catch (error) {
console.error('Failed to connect to WebSocket:', error);
throw error;
}
}
authenticate(wsToken) {
this.send({
type: 'authenticate',
token: wsToken,
organization_id: process.env.CONTEXT_ENTERPRISE_ORG_ID
});
}
handleMessage(message) {
switch (message.type) {
case 'authenticated':
console.log('WebSocket authenticated successfully');
this.isAuthenticated = true;
this.reconnectAttempts = 0;
this.resubscribeAll();
break;
case 'authentication_failed':
console.error('WebSocket authentication failed:', message.error);
this.close();
break;
case 'error':
console.error('WebSocket error:', message.error);
break;
default:
// Handle message with registered handlers
const handler = this.messageHandlers.get(message.type);
if (handler) {
handler(message.data || message);
} else {
console.log('Unhandled message type:', message.type);
}
}
}
send(message) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
return true;
} else {
console.warn('WebSocket not connected, message queued');
return false;
}
}
// Event handler registration
on(messageType, handler) {
this.messageHandlers.set(messageType, handler);
}
off(messageType) {
this.messageHandlers.delete(messageType);
}
// Subscription management
subscribe(channel) {
this.subscriptions.add(channel);
if (this.isAuthenticated) {
this.send({
type: 'subscribe',
channel: channel
});
}
}
unsubscribe(channel) {
this.subscriptions.delete(channel);
if (this.isAuthenticated) {
this.send({
type: 'unsubscribe',
channel: channel
});
}
}
resubscribeAll() {
for (const channel of this.subscriptions) {
this.send({
type: 'subscribe',
channel: channel
});
}
}
async getWebSocketToken() {
const response = await fetch('https://api.context.ai/v1/auth/websocket-token', {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.enterpriseToken}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
purposes: ['background_agent_updates', 'context_engine_stream'],
expires_in: 3600
})
});
const data = await response.json();
return data.websocket_token;
}
handleReconnection() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts - 1), 30000);
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
setTimeout(() => {
this.connect();
}, delay);
} else {
console.error('Max reconnection attempts reached');
}
}
close() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
this.isAuthenticated = false;
}
}
Background Agent WebSocket API
Task Updates and Notifications
Subscribe to real-time Background Agent task updates:Copy
Ask AI
// Connect to Background Agents WebSocket
const bgWebSocket = new ContextWebSocket(enterpriseToken);
await bgWebSocket.connect('wss://agents.yourcompany.com:8443/ws/tasks');
// Subscribe to task updates
bgWebSocket.subscribe('task_updates');
bgWebSocket.subscribe('system_notifications');
// Handle task status updates
bgWebSocket.on('task_started', (data) => {
console.log(`Task ${data.task_id} started:`, data);
updateUI('task_started', data);
});
bgWebSocket.on('task_progress', (data) => {
console.log(`Task ${data.task_id} progress: ${data.progress_percentage}%`);
updateProgressBar(data.task_id, data.progress_percentage);
});
bgWebSocket.on('task_completed', (data) => {
console.log(`Task ${data.task_id} completed:`, data);
showTaskResults(data.task_id, data.results_url);
});
bgWebSocket.on('task_failed', (data) => {
console.error(`Task ${data.task_id} failed:`, data.error_message);
showError(data.task_id, data.error_message);
});
bgWebSocket.on('system_notification', (data) => {
console.log('System notification:', data);
showNotification(data.message, data.severity);
});
Real-time Task Submission
Submit tasks via WebSocket for immediate feedback:Copy
Ask AI
// Submit task via WebSocket
function submitTaskViaWebSocket(taskConfig) {
const taskId = `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
bgWebSocket.send({
type: 'submit_task',
task_id: taskId,
task_config: taskConfig
});
return taskId;
}
// Handle task submission responses
bgWebSocket.on('task_submitted', (data) => {
console.log('Task submitted successfully:', data);
// Task is now in the system and will send updates via other message types
});
bgWebSocket.on('task_submission_failed', (data) => {
console.error('Task submission failed:', data);
showError(data.task_id, data.error_message);
});
// Usage
const taskId = submitTaskViaWebSocket({
task_type: "research_agent",
priority: 5,
input: {
query: "Analyze market trends for renewable energy",
sources: ["financial_reports", "market_data"],
depth: "comprehensive"
}
});
console.log(`Submitted task with ID: ${taskId}`);
Task Management Commands
Control tasks in real-time via WebSocket:Copy
Ask AI
// Cancel a running task
function cancelTask(taskId, reason) {
bgWebSocket.send({
type: 'cancel_task',
task_id: taskId,
reason: reason || 'User requested cancellation'
});
}
// Pause a task (if supported)
function pauseTask(taskId) {
bgWebSocket.send({
type: 'pause_task',
task_id: taskId
});
}
// Resume a paused task
function resumeTask(taskId) {
bgWebSocket.send({
type: 'resume_task',
task_id: taskId
});
}
// Update task priority
function updateTaskPriority(taskId, newPriority) {
bgWebSocket.send({
type: 'update_task_priority',
task_id: taskId,
priority: newPriority
});
}
// Handle management command responses
bgWebSocket.on('task_cancelled', (data) => {
console.log(`Task ${data.task_id} cancelled successfully`);
updateTaskStatus(data.task_id, 'cancelled');
});
bgWebSocket.on('task_paused', (data) => {
console.log(`Task ${data.task_id} paused`);
updateTaskStatus(data.task_id, 'paused');
});
bgWebSocket.on('task_resumed', (data) => {
console.log(`Task ${data.task_id} resumed`);
updateTaskStatus(data.task_id, 'processing');
});
bgWebSocket.on('command_error', (data) => {
console.error(`Command failed for task ${data.task_id}: ${data.error_message}`);
showError(data.task_id, data.error_message);
});
Context Engine WebSocket API
Streaming Queries
Stream Context Engine queries for real-time results:Copy
Ask AI
// Connect to Context Engine WebSocket
const ceWebSocket = new ContextWebSocket(enterpriseToken);
await ceWebSocket.connect('wss://api.context.ai/v1/context-engine/stream');
// Subscribe to query streams
ceWebSocket.subscribe('query_streams');
// Submit streaming query
function submitStreamingQuery(query) {
const queryId = `query_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
ceWebSocket.send({
type: 'stream_query',
query_id: queryId,
query: query.text,
context_scope: query.sources,
processing_mode: "streaming",
stream_options: {
include_intermediate_results: true,
max_response_time_ms: 30000,
confidence_threshold: 0.7
}
});
return queryId;
}
// Handle streaming responses
ceWebSocket.on('query_started', (data) => {
console.log(`Query ${data.query_id} started processing`);
showQueryProgress(data.query_id, 0);
});
ceWebSocket.on('intermediate_result', (data) => {
console.log(`Partial result for query ${data.query_id}:`, data);
appendPartialResult(data.query_id, data.partial_data);
});
ceWebSocket.on('query_completed', (data) => {
console.log(`Query ${data.query_id} completed:`, data);
showFinalResults(data.query_id, data.results);
});
ceWebSocket.on('query_failed', (data) => {
console.error(`Query ${data.query_id} failed:`, data.error_message);
showQueryError(data.query_id, data.error_message);
});
// Usage
const queryId = submitStreamingQuery({
text: "What are the latest trends in artificial intelligence investments?",
sources: ["financial_reports", "venture_capital_data", "technology_news"]
});
console.log(`Submitted streaming query with ID: ${queryId}`);
Context Updates
Receive real-time updates when Context Engine processes new data:Copy
Ask AI
// Subscribe to context updates
ceWebSocket.subscribe('context_updates');
ceWebSocket.on('context_indexed', (data) => {
console.log('New data indexed:', data);
showNotification(`New data source "${data.source_name}" has been indexed`, 'info');
});
ceWebSocket.on('context_updated', (data) => {
console.log('Context updated:', data);
invalidateCache(data.affected_queries);
});
ceWebSocket.on('relationship_discovered', (data) => {
console.log('New relationship discovered:', data);
updateKnowledgeGraph(data.relationship);
});
Message Formats
Standard Message Structure
All WebSocket messages follow this structure:Copy
Ask AI
{
"type": "message_type",
"timestamp": "2024-01-15T10:30:00Z",
"data": {
// Message-specific data
},
"metadata": {
"message_id": "msg_1234567890",
"organization_id": "org_abc123",
"correlation_id": "corr_xyz789"
}
}
Background Agent Message Types
Task Status Messages
Task Started
Copy
Ask AI
{
"type": "task_started",
"timestamp": "2024-01-15T10:30:00Z",
"data": {
"task_id": "task_123e4567-e89b-12d3-a456-426614174000",
"task_type": "research_agent",
"priority": 5,
"estimated_duration_minutes": 45
}
}
Task Progress
Copy
Ask AI
{
"type": "task_progress",
"timestamp": "2024-01-15T10:35:00Z",
"data": {
"task_id": "task_123e4567-e89b-12d3-a456-426614174000",
"progress_percentage": 25,
"current_stage": "data_collection",
"stages_completed": ["initialization", "source_validation"],
"estimated_remaining_minutes": 30
}
}
Task Completed
Copy
Ask AI
{
"type": "task_completed",
"timestamp": "2024-01-15T11:15:00Z",
"data": {
"task_id": "task_123e4567-e89b-12d3-a456-426614174000",
"duration_minutes": 45,
"results_available": true,
"results_url": "/api/v1/tasks/task_123e4567-e89b-12d3-a456-426614174000/results",
"confidence_score": 0.94
}
}
Context Engine Message Types
Query Started
Copy
Ask AI
{
"type": "query_started",
"timestamp": "2024-01-15T10:30:00Z",
"data": {
"query_id": "query_abc123def456",
"query": "Market analysis for tech sector",
"estimated_processing_time_ms": 5000
}
}
Intermediate Result
Copy
Ask AI
{
"type": "intermediate_result",
"timestamp": "2024-01-15T10:30:15Z",
"data": {
"query_id": "query_abc123def456",
"partial_data": {
"key_insights": ["Tech sector growth at 12%"],
"confidence": 0.8,
"sources_processed": 15,
"processing_progress": 0.6
}
}
}
Query Completed
Copy
Ask AI
{
"type": "query_completed",
"timestamp": "2024-01-15T10:30:35Z",
"data": {
"query_id": "query_abc123def456",
"results": {
"summary": "Comprehensive market analysis results",
"insights": ["Key insight 1", "Key insight 2"],
"confidence_score": 0.94,
"sources_analyzed": 47
},
"processing_time_ms": 35000
}
}
Error Handling
Connection Errors
Handle various connection and authentication errors:Copy
Ask AI
class WebSocketErrorHandler {
static handleError(ws, error) {
switch (error.type) {
case 'authentication_failed':
console.error('WebSocket authentication failed:', error.message);
// Attempt to refresh token and reconnect
ws.refreshTokenAndReconnect();
break;
case 'connection_error':
console.error('WebSocket connection error:', error.message);
// Attempt exponential backoff reconnection
ws.handleReconnection();
break;
case 'rate_limit_exceeded':
console.warn('WebSocket rate limit exceeded:', error.message);
// Wait for the specified duration before reconnecting
setTimeout(() => ws.connect(), error.retry_after * 1000);
break;
case 'invalid_message':
console.error('Invalid message sent:', error.message);
// Log for debugging, don't reconnect
break;
default:
console.error('Unknown WebSocket error:', error);
}
}
static isRetryableError(error) {
const retryableErrors = [
'connection_error',
'rate_limit_exceeded',
'temporary_server_error'
];
return retryableErrors.includes(error.type);
}
}
// Enhanced error handling in WebSocket class
bgWebSocket.on('error', (error) => {
WebSocketErrorHandler.handleError(bgWebSocket, error);
});
Message Validation
Validate incoming messages to ensure data integrity:Copy
Ask AI
class MessageValidator {
static validateMessage(message) {
const errors = [];
if (!message.type) {
errors.push('Message type is required');
}
if (!message.timestamp) {
errors.push('Message timestamp is required');
}
if (message.timestamp && !this.isValidTimestamp(message.timestamp)) {
errors.push('Invalid timestamp format');
}
return {
isValid: errors.length === 0,
errors: errors
};
}
static isValidTimestamp(timestamp) {
const date = new Date(timestamp);
return date instanceof Date && !isNaN(date);
}
static validateTaskMessage(message) {
const baseValidation = this.validateMessage(message);
if (!baseValidation.isValid) {
return baseValidation;
}
const errors = [];
if (message.type.startsWith('task_') && !message.data?.task_id) {
errors.push('Task ID is required for task messages');
}
return {
isValid: errors.length === 0,
errors: errors
};
}
}
// Use validation in message handler
bgWebSocket.handleMessage = function(rawMessage) {
const validation = MessageValidator.validateTaskMessage(rawMessage);
if (!validation.isValid) {
console.error('Invalid message received:', validation.errors);
return;
}
// Process valid message
this.processMessage(rawMessage);
};
Best Practices
Connection Management
Connection Pooling
- Reuse WebSocket connections across components
- Implement connection pooling for multiple channels
- Avoid creating multiple connections per user
Graceful Disconnection
- Close connections properly on page unload
- Unsubscribe from channels before disconnecting
- Handle browser refresh and navigation
Performance Optimization
Copy
Ask AI
class OptimizedWebSocket extends ContextWebSocket {
constructor(enterpriseToken) {
super(enterpriseToken);
this.messageQueue = [];
this.batchSize = 10;
this.batchTimeout = 100; // ms
}
// Batch messages for better performance
sendBatch(messages) {
if (this.isAuthenticated && messages.length > 0) {
this.send({
type: 'batch',
messages: messages
});
}
}
// Queue messages and send in batches
queueMessage(message) {
this.messageQueue.push(message);
if (this.messageQueue.length >= this.batchSize) {
this.flushQueue();
} else if (this.messageQueue.length === 1) {
// Start timeout for first message in queue
setTimeout(() => this.flushQueue(), this.batchTimeout);
}
}
flushQueue() {
if (this.messageQueue.length > 0) {
this.sendBatch([...this.messageQueue]);
this.messageQueue = [];
}
}
// Enhanced subscription management
bulkSubscribe(channels) {
this.send({
type: 'bulk_subscribe',
channels: channels
});
channels.forEach(channel => this.subscriptions.add(channel));
}
}
Security Considerations
Copy
Ask AI
// Secure WebSocket implementation
class SecureWebSocket extends ContextWebSocket {
constructor(enterpriseToken) {
super(enterpriseToken);
this.allowedOrigins = ['https://yourcompany.com'];
this.messageRateLimit = 100; // messages per minute
this.messageCounts = new Map();
}
validateOrigin() {
const origin = window.location.origin;
return this.allowedOrigins.includes(origin);
}
checkRateLimit(messageType) {
const now = Date.now();
const windowStart = now - 60000; // 1 minute window
if (!this.messageCounts.has(messageType)) {
this.messageCounts.set(messageType, []);
}
const timestamps = this.messageCounts.get(messageType);
// Remove old timestamps
const recentTimestamps = timestamps.filter(ts => ts > windowStart);
if (recentTimestamps.length >= this.messageRateLimit) {
return false; // Rate limit exceeded
}
recentTimestamps.push(now);
this.messageCounts.set(messageType, recentTimestamps);
return true;
}
send(message) {
if (!this.validateOrigin()) {
console.error('Invalid origin for WebSocket communication');
return false;
}
if (!this.checkRateLimit(message.type)) {
console.warn(`Rate limit exceeded for message type: ${message.type}`);
return false;
}
return super.send(message);
}
}
Testing WebSocket Integration
Unit Tests
Copy
Ask AI
const test = require('ava');
const WebSocket = require('ws');
const ContextWebSocket = require('./context-websocket');
test('should authenticate successfully', async t => {
const mockWS = {
send: sinon.spy(),
readyState: WebSocket.OPEN
};
const contextWS = new ContextWebSocket('test-token');
contextWS.ws = mockWS;
contextWS.authenticate('ws-token');
t.true(mockWS.send.calledOnce);
t.true(mockWS.send.calledWith(JSON.stringify({
type: 'authenticate',
token: 'ws-token',
organization_id: process.env.CONTEXT_ENTERPRISE_ORG_ID
})));
});
test('should handle task updates correctly', t => {
const contextWS = new ContextWebSocket('test-token');
let receivedUpdate = null;
contextWS.on('task_progress', (data) => {
receivedUpdate = data;
});
contextWS.handleMessage({
type: 'task_progress',
data: {
task_id: 'test-task-123',
progress_percentage: 50
}
});
t.deepEqual(receivedUpdate, {
task_id: 'test-task-123',
progress_percentage: 50
});
});
Integration Tests
Copy
Ask AI
test('end-to-end WebSocket communication', async t => {
const contextWS = new ContextWebSocket(process.env.TEST_ENTERPRISE_TOKEN);
// Connect and authenticate
await contextWS.connect('wss://test-agents.context.ai/ws/tasks');
// Wait for authentication
await new Promise((resolve) => {
contextWS.on('authenticated', resolve);
});
t.true(contextWS.isAuthenticated);
// Subscribe to task updates
contextWS.subscribe('task_updates');
// Submit a test task
const taskId = contextWS.submitTaskViaWebSocket({
task_type: 'research_agent',
input: { query: 'test query' }
});
// Wait for task completion
const result = await new Promise((resolve) => {
contextWS.on('task_completed', (data) => {
if (data.task_id === taskId) {
resolve(data);
}
});
});
t.truthy(result);
t.is(result.task_id, taskId);
contextWS.close();
});