External Integration Patterns¶
This document describes common patterns for integrating external systems with the Gunn multi-agent simulation core.
Overview¶
The Gunn simulation core provides multiple integration points through standardized APIs:
REST API: HTTP endpoints for web applications and services
WebSocket: Real-time streaming for low-latency interactions
gRPC: High-performance binary protocol for game engines
Message-oriented: Event-driven patterns for loose coupling
Integration Patterns¶
1. Game Engine Integration (Unity/Unreal)¶
Game engines typically need bidirectional, low-latency communication with the simulation core.
Unity Integration Pattern¶
// Unity C# client using gRPC
using Gunn.Simulation.V1;
using Grpc.Core;
public class UnitySimulationClient : MonoBehaviour
{
private UnityAdapter.UnityAdapterClient client;
private AsyncServerStreamingCall<ObservationDelta> observationStream;
async void Start()
{
// Connect to simulation core
var channel = new Channel("localhost:50051", ChannelCredentials.Insecure);
client = new UnityAdapter.UnityAdapterClient(channel);
// Start observation stream
var request = new ObservationRequest
{
WorldId = "unity_world_001",
AgentId = "player_agent"
};
observationStream = client.StreamObservations(request);
// Process observations in background
_ = ProcessObservations();
}
private async Task ProcessObservations()
{
await foreach (var delta in observationStream.ResponseStream.ReadAllAsync())
{
// Apply JSON patches to local game state
ApplyObservationDelta(delta);
}
}
public async void SubmitPlayerAction(string action, Vector3 position)
{
var intent = new Intent
{
Kind = Intent.Types.Kind.KindMove,
AgentId = "player_agent",
ReqId = System.Guid.NewGuid().ToString(),
ContextSeq = currentViewSeq,
Priority = 1,
SchemaVersion = "1.0.0"
};
// Set payload with Unity-specific data
var movePayload = new MovePayload
{
TargetPosition = new Vector3 { X = position.x, Y = position.y, Z = position.z },
Speed = 5.0f,
Run = true
};
intent.Payload = Any.Pack(movePayload);
try
{
var response = await client.SubmitIntentAsync(intent);
Debug.Log($"Intent accepted: {response.Status}");
}
catch (RpcException ex)
{
Debug.LogError($"Intent failed: {ex.Status}");
}
}
// Handle Unity physics events
void OnCollisionEnter(Collision collision)
{
var collisionEvent = new CollisionEvent
{
Entity1 = gameObject.name,
Entity2 = collision.gameObject.name,
ImpactForce = collision.impulse.magnitude,
Position = new Vector3
{
X = collision.contacts[0].point.x,
Y = collision.contacts[0].point.y,
Z = collision.contacts[0].point.z
}
};
var effectDraft = new EffectDraft
{
Kind = "Collision",
SourceId = "unity_adapter",
SchemaVersion = "1.0.0",
Payload = Any.Pack(collisionEvent)
};
_ = client.EmitEffectAsync(effectDraft);
}
}
Key Unity Integration Points¶
Time Synchronization: Unity sends TimeTick events to maintain simulation time
Physics Events: Collisions, triggers, and other physics events become Effects
Agent Actions: Player inputs and AI decisions become Intents
Visual Updates: ObservationDeltas drive visual state changes
Spatial Queries: Unity's spatial systems inform observation policies
2. Web Application Integration¶
Web applications typically use REST APIs with WebSocket streaming for real-time updates.
JavaScript/TypeScript Web Client¶
// Web client using REST API and WebSocket
class SimulationClient {
private baseUrl: string;
private authToken: string;
private websocket: WebSocket | null = null;
constructor(baseUrl: string, authToken: string) {
this.baseUrl = baseUrl;
this.authToken = authToken;
}
// Get current observation via REST
async getObservation(worldId: string, agentId: string): Promise<View | ObservationDelta> {
const response = await fetch(
`${this.baseUrl}/worlds/${worldId}/agents/${agentId}/observe`,
{
headers: {
'Authorization': `Bearer ${this.authToken}`,
'Content-Type': 'application/json'
}
}
);
if (!response.ok) {
const error = await response.json();
throw new SimulationError(error.error);
}
return response.json();
}
// Submit intent via REST
async submitIntent(worldId: string, agentId: string, intent: IntentRequest): Promise<IntentResponse> {
const response = await fetch(
`${this.baseUrl}/worlds/${worldId}/agents/${agentId}/intents`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${this.authToken}`,
'Content-Type': 'application/json'
},
body: JSON.stringify(intent)
}
);
if (!response.ok) {
const error = await response.json();
throw new SimulationError(error.error);
}
return response.json();
}
// Stream observations via WebSocket
streamObservations(worldId: string, agentId: string, onDelta: (delta: ObservationDelta) => void): void {
const wsUrl = `${this.baseUrl.replace('http', 'ws')}/worlds/${worldId}/agents/${agentId}/stream`;
this.websocket = new WebSocket(wsUrl, [], {
headers: {
'Authorization': `Bearer ${this.authToken}`
}
});
this.websocket.onmessage = (event) => {
const delta: ObservationDelta = JSON.parse(event.data);
onDelta(delta);
};
this.websocket.onerror = (error) => {
console.error('WebSocket error:', error);
};
this.websocket.onclose = (event) => {
if (event.code !== 1000) {
// Reconnect on unexpected close
setTimeout(() => this.streamObservations(worldId, agentId, onDelta), 1000);
}
};
}
// Apply JSON Patch to local state
applyObservationDelta(currentView: any, delta: ObservationDelta): any {
const jsonpatch = require('jsonpatch');
return jsonpatch.apply_patch(currentView, delta.patches);
}
}
// Error handling
class SimulationError extends Error {
constructor(public errorInfo: ErrorResponse) {
super(errorInfo.message);
this.name = 'SimulationError';
}
get recoveryAction(): string {
return this.errorInfo.recovery_action;
}
get errorCode(): number {
return this.errorInfo.code;
}
}
// Usage example
const client = new SimulationClient('https://api.gunn.example.com/v1', 'your-jwt-token');
// Get initial observation
const view = await client.getObservation('world_001', 'agent_001');
// Submit speaking intent
await client.submitIntent('world_001', 'agent_001', {
kind: 'Speak',
payload: { text: 'Hello, world!', target_agent: 'agent_002' },
context_seq: view.view_seq,
req_id: crypto.randomUUID(),
priority: 1,
schema_version: '1.0.0'
});
// Stream real-time updates
client.streamObservations('world_001', 'agent_001', (delta) => {
const updatedView = client.applyObservationDelta(currentView, delta);
updateUI(updatedView);
});
Key Web Integration Points¶
Authentication: JWT tokens or OAuth for API access
Real-time Updates: WebSocket streaming for low-latency observation deltas
Error Handling: Structured error responses with recovery actions
State Management: JSON Patch application for efficient UI updates
Offline Support: Local state caching and sync on reconnection
3. LLM Service Integration¶
LLM services need streaming capabilities with cancellation support for intelligent interruption.
Python LLM Adapter¶
import asyncio
from typing import AsyncIterator, Optional
import openai
from gunn.schemas.types import CancelToken, Intent, Effect
class LLMAdapter:
def __init__(self, api_key: str, model: str = "gpt-4.1-mini"):
self.client = openai.AsyncOpenAI(api_key=api_key)
self.model = model
async def generate_response(
self,
prompt: str,
context: dict,
cancel_token: CancelToken,
max_tokens: int = 150
) -> AsyncIterator[str]:
"""Generate streaming response with cancellation support."""
try:
stream = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are an AI agent in a simulation."},
{"role": "user", "content": prompt}
],
max_tokens=max_tokens,
stream=True
)
async for chunk in stream:
# Check for cancellation every token
if cancel_token.cancelled:
await stream.aclose()
return
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
# Yield control every 20-30ms for responsive cancellation
await asyncio.sleep(0.025)
except Exception as e:
if not cancel_token.cancelled:
raise
async def process_intent(
self,
intent: Intent,
world_context: dict,
cancel_token: CancelToken
) -> Effect | None:
"""Process an intent and generate appropriate response."""
if intent["kind"] == "Speak":
# Generate speaking response
prompt = f"Respond to: {intent['payload'].get('text', '')}"
response_text = ""
async for token in self.generate_response(prompt, world_context, cancel_token):
response_text += token
if cancel_token.cancelled:
return None # Generation was cancelled
# Create effect for the response
effect: Effect = {
"uuid": str(uuid.uuid4()),
"kind": "SpeakResponse",
"payload": {
"text": response_text,
"agent_id": intent["agent_id"],
"in_response_to": intent["req_id"]
},
"global_seq": 0, # Will be filled by orchestrator
"sim_time": 0.0, # Will be filled by orchestrator
"source_id": "llm_adapter",
"schema_version": "1.0.0"
}
return effect
return None
# Usage with orchestrator
async def handle_agent_intent(orchestrator, intent: Intent):
# Issue cancel token for this generation
cancel_token = orchestrator.issue_cancel_token(intent["agent_id"], intent["req_id"])
# Start LLM generation
llm_adapter = LLMAdapter(api_key="your-api-key")
world_context = orchestrator.get_world_context(intent["agent_id"])
# Generate response with cancellation support
effect = await llm_adapter.process_intent(intent, world_context, cancel_token)
if effect and not cancel_token.cancelled:
# Submit effect to orchestrator
await orchestrator.broadcast_event(effect)
elif cancel_token.cancelled:
print(f"Generation cancelled: {cancel_token.reason}")
Key LLM Integration Points¶
Streaming Generation: Token-by-token generation with regular cancellation checks
Context Management: World state and conversation history as context
Cancellation Handling: Immediate halt on cancel token trigger
Error Recovery: Graceful handling of API failures and timeouts
Token Budgets: Respect token limits and quotas
4. Multi-Tenant SaaS Integration¶
SaaS applications need tenant isolation, authentication, and rate limiting.
Multi-Tenant Architecture¶
# Tenant-aware client wrapper
class TenantSimulationClient:
def __init__(self, base_client, tenant_id: str, api_key: str):
self.base_client = base_client
self.tenant_id = tenant_id
self.api_key = api_key
async def create_world(self, world_config: dict) -> str:
"""Create a new simulation world for this tenant."""
world_id = f"{self.tenant_id}_{uuid.uuid4().hex[:8]}"
# Initialize world with tenant-specific configuration
await self.base_client.initialize_world(
world_id=world_id,
config=world_config,
tenant_id=self.tenant_id
)
return world_id
async def register_agent(self, world_id: str, agent_config: dict) -> str:
"""Register an agent in a tenant's world."""
# Verify world belongs to this tenant
if not world_id.startswith(self.tenant_id):
raise PermissionError("Cannot access world from different tenant")
agent_id = f"{self.tenant_id}_{agent_config['name']}_{uuid.uuid4().hex[:8]}"
# Apply tenant-specific observation policies
observation_policy = self._create_tenant_policy(agent_config)
await self.base_client.register_agent(
world_id=world_id,
agent_id=agent_id,
policy=observation_policy
)
return agent_id
def _create_tenant_policy(self, agent_config: dict):
"""Create observation policy with tenant-specific constraints."""
# Apply tenant limits (e.g., observation range, relationship depth)
max_distance = min(agent_config.get('observation_range', 100),
self._get_tenant_limit('max_observation_range'))
return ObservationPolicy(
distance_limit=max_distance,
relationship_filter=agent_config.get('relationships', []),
tenant_id=self.tenant_id
)
# Rate limiting and quotas
class TenantRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
async def check_rate_limit(self, tenant_id: str, operation: str) -> bool:
"""Check if tenant is within rate limits for operation."""
key = f"rate_limit:{tenant_id}:{operation}"
current = await self.redis.get(key)
limits = {
'intents_per_minute': 1000,
'observations_per_minute': 5000,
'worlds_per_day': 10
}
limit = limits.get(operation, 100)
if current and int(current) >= limit:
return False
# Increment counter with expiration
await self.redis.incr(key)
await self.redis.expire(key, 60) # 1 minute window
return True
Key Multi-Tenant Integration Points¶
Tenant Isolation: World and agent IDs prefixed with tenant identifier
Resource Quotas: Per-tenant limits on worlds, agents, and operations
Rate Limiting: Per-tenant rate limits with Redis-based tracking
Data Segregation: Tenant-specific observation policies and access controls
Billing Integration: Usage tracking for metered billing
Error Handling Patterns¶
Structured Error Responses¶
All integrations should handle errors consistently using the structured format:
interface ErrorResponse {
error: {
code: number; // From docs/errors.md
name: string; // Error constant name
message: string; // Human-readable message
details: any; // Structured error details
recovery_action: string; // Suggested recovery action
timestamp: string; // ISO 8601 timestamp
trace_id: string; // For distributed tracing
};
}
// Error handling with recovery actions
async function handleSimulationError(error: ErrorResponse) {
switch (error.error.recovery_action) {
case 'RETRY':
await delay(1000);
return retryOperation();
case 'RETRY_WITH_DELAY':
const backoff = Math.min(1000 * Math.pow(2, retryCount), 30000);
await delay(backoff);
return retryOperation();
case 'REGENERATE':
// Cancel current generation and start fresh
await cancelCurrentGeneration();
return startNewGeneration();
case 'MODIFY_INTENT':
// Prompt user to modify their intent
return promptForIntentModification(error.error.details);
case 'DEFER':
// Queue for later processing
return queueForLater();
case 'ABORT':
// Permanent failure, notify user
throw new Error(error.error.message);
}
}
Circuit Breaker Pattern¶
For resilient integrations, implement circuit breaker patterns:
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
async def call(self, func, *args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
# Usage
circuit_breaker = CircuitBreaker()
async def submit_intent_with_circuit_breaker(intent):
return await circuit_breaker.call(simulation_client.submit_intent, intent)
Performance Considerations¶
Connection Pooling¶
For high-throughput integrations, use connection pooling:
# gRPC connection pooling
class GrpcConnectionPool:
def __init__(self, target: str, pool_size: int = 10):
self.target = target
self.pool = asyncio.Queue(maxsize=pool_size)
# Pre-populate pool
for _ in range(pool_size):
channel = grpc.aio.insecure_channel(target)
client = UnityAdapterStub(channel)
self.pool.put_nowait(client)
async def get_client(self):
return await self.pool.get()
async def return_client(self, client):
await self.pool.put(client)
@asynccontextmanager
async def client(self):
client = await self.get_client()
try:
yield client
finally:
await self.return_client(client)
# Usage
pool = GrpcConnectionPool("localhost:50051")
async with pool.client() as client:
response = await client.SubmitIntent(intent)
Batch Operations¶
For efficiency, batch multiple operations:
async def batch_submit_intents(intents: List[Intent]) -> List[IntentResponse]:
"""Submit multiple intents in a single batch."""
batch_request = BatchIntentRequest(intents=intents)
response = await client.BatchSubmitIntents(batch_request)
return response.responses
# Observation delta batching
class ObservationBatcher:
def __init__(self, batch_size: int = 10, flush_interval: float = 0.1):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.pending_deltas = []
self.last_flush = time.time()
async def add_delta(self, delta: ObservationDelta):
self.pending_deltas.append(delta)
if (len(self.pending_deltas) >= self.batch_size or
time.time() - self.last_flush > self.flush_interval):
await self.flush()
async def flush(self):
if self.pending_deltas:
await self.send_batch(self.pending_deltas)
self.pending_deltas.clear()
self.last_flush = time.time()
Security Best Practices¶
Input Validation¶
from pydantic import BaseModel, validator
class IntentRequest(BaseModel):
kind: str
payload: dict
context_seq: int
req_id: str
priority: int
schema_version: str
@validator('kind')
def validate_kind(cls, v):
allowed_kinds = ['Speak', 'Move', 'Interact', 'Custom']
if v not in allowed_kinds:
raise ValueError(f'Invalid intent kind: {v}')
return v
@validator('req_id')
def validate_req_id(cls, v):
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError('Invalid req_id format')
return v
@validator('priority')
def validate_priority(cls, v):
if not -100 <= v <= 100:
raise ValueError('Priority must be between -100 and 100')
return v
This comprehensive integration guide provides patterns for the most common external system integrations with the Gunn simulation core, emphasizing proper error handling, performance optimization, and security best practices.