"""Core orchestrator for multi-agent simulation.
This module provides the central Orchestrator class that coordinates all system
operations including event ingestion, view generation, intent validation,
and observation distribution with deterministic ordering.
"""
import asyncio
import time
import uuid
from collections.abc import Awaitable, Callable
from typing import Any, Literal, Protocol
from gunn.core.concurrent_processor import (
BatchResult,
ConcurrentIntentProcessor,
ConcurrentProcessingConfig,
ProcessingMode,
)
from gunn.core.event_log import EventLog
from gunn.policies.observation import ObservationPolicy
from gunn.schemas.messages import WorldState
from gunn.schemas.types import (
CancelToken,
Effect,
EffectDraft,
Intent,
ObservationDelta,
)
from gunn.storage.dedup_store import DedupStore, InMemoryDedupStore
from gunn.utils.backpressure import backpressure_manager
from gunn.utils.errors import (
BackpressureError,
QuotaExceededError,
StaleContextError,
ValidationError,
)
from gunn.utils.memory import MemoryConfig, MemoryManager
from gunn.utils.scheduling import WeightedRoundRobinScheduler
from gunn.utils.telemetry import (
MonotonicClock,
async_performance_timer,
bandwidth_monitor,
get_logger,
get_tracer,
record_backpressure_event,
record_conflict,
record_intent_throughput,
record_observation_delivery_latency,
record_queue_depth,
record_queue_high_watermark,
system_monitor,
update_active_agents_count,
update_global_seq,
update_view_seq,
)
from gunn.utils.timing import TimedQueue
[ドキュメント]
class EffectValidator(Protocol):
"""Protocol for validating intents before creating effects."""
[ドキュメント]
def validate_intent(self, intent: Intent, world_state: WorldState) -> bool:
"""Validate if intent can be executed.
Args:
intent: Intent to validate
world_state: Current world state
Returns:
True if intent is valid and can be executed
"""
...
[ドキュメント]
def set_agent_permissions(self, agent_id: str, permissions: set[str]) -> None:
"""Set permissions for an agent.
Args:
agent_id: Agent identifier
permissions: Set of permission strings
"""
...
# Type alias for effect handler functions
EffectHandler = Callable[[Effect, WorldState], Awaitable[None]]
[ドキュメント]
class DefaultEffectValidator:
"""Default implementation of EffectValidator with comprehensive validation.
Provides validation for quota limits, cooldowns, permissions, and world state
constraints to ensure intents can be safely executed.
Requirements addressed:
- 1.5: Intent validation through EffectValidator before creating an Effect
- 3.4: Validation for quota limits, cooldowns, and permissions
- 10.3: Structured error codes for validation failures
"""
def __init__(
self,
max_intents_per_minute: int = 60,
max_tokens_per_minute: int = 10000,
default_cooldown_seconds: float = 1.0,
max_payload_size_bytes: int = 10000,
allowed_intent_kinds: set[str] | None = None,
):
"""Initialize validator with configuration.
Args:
max_intents_per_minute: Maximum intents per agent per minute
max_tokens_per_minute: Maximum tokens per agent per minute
default_cooldown_seconds: Default cooldown between intents
max_payload_size_bytes: Maximum payload size in bytes
allowed_intent_kinds: Set of allowed intent kinds (None = allow all)
"""
self.max_intents_per_minute = max_intents_per_minute
self.max_tokens_per_minute = max_tokens_per_minute
# Validation caching for performance optimization
self._permission_cache: dict[str, set[str]] = {}
self._validation_cache: dict[
str, tuple[bool, float]
] = {} # (is_valid, timestamp)
self._cache_ttl = 1.0 # Cache TTL in seconds
self.default_cooldown_seconds = default_cooldown_seconds
self.max_payload_size_bytes = max_payload_size_bytes
self.allowed_intent_kinds = allowed_intent_kinds or {
"Speak",
"Move",
"Interact",
"Custom",
}
# Tracking state for validation
self._agent_intent_history: dict[str, list[float]] = {}
self._agent_token_usage: dict[str, list[tuple[float, int]]] = {}
self._agent_last_intent_time: dict[str, float] = {}
self._agent_permissions: dict[str, set[str]] = {}
self._intent_kind_cooldowns: dict[str, float] = {
"Speak": 0.5,
"Move": 0.1,
"Interact": 2.0,
"Custom": 1.0,
}
[ドキュメント]
def set_agent_permissions(self, agent_id: str, permissions: set[str]) -> None:
"""Set permissions for an agent.
Args:
agent_id: Agent identifier
permissions: Set of permission strings
"""
self._agent_permissions[agent_id] = permissions.copy()
# Invalidate cache for this agent
self._permission_cache.pop(agent_id, None)
# Invalidate validation cache entries for this agent
keys_to_remove = [
k for k in self._validation_cache.keys() if k.startswith(f"{agent_id}:")
]
for key in keys_to_remove:
self._validation_cache.pop(key, None)
[ドキュメント]
def add_agent_permission(self, agent_id: str, permission: str) -> None:
"""Add a permission for an agent.
Args:
agent_id: Agent identifier
permission: Permission string to add
"""
if agent_id not in self._agent_permissions:
self._agent_permissions[agent_id] = set()
self._agent_permissions[agent_id].add(permission)
[ドキュメント]
def remove_agent_permission(self, agent_id: str, permission: str) -> None:
"""Remove a permission for an agent.
Args:
agent_id: Agent identifier
permission: Permission string to remove
"""
if agent_id in self._agent_permissions:
self._agent_permissions[agent_id].discard(permission)
[ドキュメント]
def set_intent_kind_cooldown(
self, intent_kind: str, cooldown_seconds: float
) -> None:
"""Set cooldown for a specific intent kind.
Args:
intent_kind: Intent kind to set cooldown for
cooldown_seconds: Cooldown duration in seconds
"""
self._intent_kind_cooldowns[intent_kind] = cooldown_seconds
[ドキュメント]
def validate_intent(self, intent: Intent, world_state: WorldState) -> bool:
"""Validate if intent can be executed.
Performs comprehensive validation including:
- Basic structure validation
- Permission checks
- Quota limit validation
- Cooldown enforcement
- World state constraint checks
- Payload size limits
Args:
intent: Intent to validate
world_state: Current world state
Returns:
True if intent is valid and can be executed
Raises:
ValidationError: If validation fails with detailed reasons
"""
validation_failures: list[str] = []
current_time = time.time()
_agent_id = intent.get("agent_id", "")
_intent_kind = intent.get("kind", "")
try:
# 1. Basic structure validation
validation_failures.extend(self._validate_structure(intent))
# 2. Permission validation
validation_failures.extend(self._validate_permissions(intent))
# 3. Intent kind validation
validation_failures.extend(self._validate_intent_kind(intent))
# 4. Payload size validation
validation_failures.extend(self._validate_payload_size(intent))
# 5. Quota validation
validation_failures.extend(
self._validate_quota_limits(intent, current_time)
)
# 6. Cooldown validation
validation_failures.extend(self._validate_cooldowns(intent, current_time))
# 7. World state constraint validation
validation_failures.extend(
self._validate_world_state_constraints(intent, world_state)
)
# If any validation failed, raise ValidationError
if validation_failures:
from gunn.utils.errors import ValidationError
raise ValidationError(intent, validation_failures)
# Record successful validation for quota/cooldown tracking
self._record_intent_validation(intent, current_time)
return True
except Exception as e:
# Re-raise ValidationError as-is, wrap others
if isinstance(e, ValidationError):
raise
# Wrap unexpected errors
from gunn.utils.errors import ValidationError
validation_failures.append(f"validation_error: {e!s}")
raise ValidationError(intent, validation_failures) from e
def _validate_structure(self, intent: Intent) -> list[str]:
"""Validate basic intent structure.
Args:
intent: Intent to validate
Returns:
List of validation failure messages
"""
failures = []
required_fields = ["agent_id", "kind", "req_id", "schema_version"]
for field in required_fields:
if not intent.get(field):
failures.append(f"missing_required_field: {field}")
# Validate agent_id format
agent_id = intent.get("agent_id", "")
if agent_id and (
len(agent_id) > 100
or not agent_id.replace("_", "").replace("-", "").isalnum()
):
failures.append("invalid_agent_id_format")
# Validate req_id format
req_id = intent.get("req_id", "")
if req_id and len(req_id) > 200:
failures.append("req_id_too_long")
# Validate priority range
priority = intent.get("priority", 0)
if not isinstance(priority, int) or priority < -100 or priority > 100:
failures.append("invalid_priority_range")
return failures
def _validate_permissions(self, intent: Intent) -> list[str]:
"""Validate agent permissions for this intent.
Args:
intent: Intent to validate
Returns:
List of validation failure messages
"""
failures = []
agent_id = intent.get("agent_id", "")
intent_kind = intent.get("kind", "")
# Get agent permissions
agent_permissions = self._agent_permissions.get(agent_id, set())
# Check basic submit_intent permission
if "submit_intent" not in agent_permissions:
failures.append("missing_permission: submit_intent")
# Check intent-kind-specific permissions
kind_permission_map = {
"Speak": "intent:speak",
"Move": "intent:move",
"Interact": "intent:interact",
"Custom": "intent:custom",
}
required_permission = kind_permission_map.get(intent_kind)
if required_permission and required_permission not in agent_permissions:
failures.append(f"missing_permission: {required_permission}")
return failures
def _validate_intent_kind(self, intent: Intent) -> list[str]:
"""Validate intent kind is allowed.
Args:
intent: Intent to validate
Returns:
List of validation failure messages
"""
failures = []
intent_kind = intent.get("kind", "")
if intent_kind not in self.allowed_intent_kinds:
allowed_kinds = ", ".join(sorted(self.allowed_intent_kinds))
failures.append(
f"invalid_intent_kind: {intent_kind} (allowed: {allowed_kinds})"
)
return failures
def _validate_payload_size(self, intent: Intent) -> list[str]:
"""Validate payload size limits.
Args:
intent: Intent to validate
Returns:
List of validation failure messages
"""
failures = []
payload = intent.get("payload", {})
# Estimate payload size (rough JSON serialization size)
import json
try:
payload_size = len(json.dumps(payload, separators=(",", ":")))
if payload_size > self.max_payload_size_bytes:
failures.append(
f"payload_too_large: {payload_size} > {self.max_payload_size_bytes} bytes"
)
except (TypeError, ValueError):
failures.append("payload_not_serializable")
return failures
def _validate_quota_limits(self, intent: Intent, current_time: float) -> list[str]:
"""Validate quota limits for the agent.
Args:
intent: Intent to validate
current_time: Current timestamp
Returns:
List of validation failure messages
"""
failures = []
agent_id = intent.get("agent_id", "")
# Initialize tracking if needed
if agent_id not in self._agent_intent_history:
self._agent_intent_history[agent_id] = []
if agent_id not in self._agent_token_usage:
self._agent_token_usage[agent_id] = []
# Clean up old entries (older than 1 minute)
cutoff_time = current_time - 60.0
self._agent_intent_history[agent_id] = [
t for t in self._agent_intent_history[agent_id] if t > cutoff_time
]
self._agent_token_usage[agent_id] = [
(t, tokens)
for t, tokens in self._agent_token_usage[agent_id]
if t > cutoff_time
]
# Check intent quota
intent_count = len(self._agent_intent_history[agent_id])
if intent_count >= self.max_intents_per_minute:
failures.append(
f"intent_quota_exceeded: {intent_count} >= {self.max_intents_per_minute}"
)
# Check token quota (estimate tokens from payload)
payload = intent.get("payload", {})
estimated_tokens = self._estimate_token_usage(payload)
current_token_usage = sum(
tokens for _, tokens in self._agent_token_usage[agent_id]
)
if current_token_usage + estimated_tokens > self.max_tokens_per_minute:
failures.append(
f"token_quota_exceeded: {current_token_usage + estimated_tokens} > {self.max_tokens_per_minute}"
)
return failures
def _validate_cooldowns(self, intent: Intent, current_time: float) -> list[str]:
"""Validate cooldown constraints.
Args:
intent: Intent to validate
current_time: Current timestamp
Returns:
List of validation failure messages
"""
failures = []
agent_id = intent.get("agent_id", "")
intent_kind = intent.get("kind", "")
# Check last intent time for this agent
last_intent_time = self._agent_last_intent_time.get(agent_id, 0.0)
# Get cooldown for this intent kind
cooldown = self._intent_kind_cooldowns.get(
intent_kind, self.default_cooldown_seconds
)
time_since_last = current_time - last_intent_time
if time_since_last < cooldown:
remaining_cooldown = cooldown - time_since_last
failures.append(
f"cooldown_active: {remaining_cooldown:.2f}s remaining for {intent_kind}"
)
return failures
def _validate_world_state_constraints(
self, intent: Intent, world_state: WorldState
) -> list[str]:
"""Validate world state constraints for the intent.
Args:
intent: Intent to validate
world_state: Current world state
Returns:
List of validation failure messages
"""
failures = []
agent_id = intent.get("agent_id", "")
intent_kind = intent.get("kind", "")
payload = intent.get("payload", {})
# Check if agent exists in world state
if agent_id not in world_state.entities:
failures.append(f"agent_not_in_world: {agent_id}")
return failures # Can't validate further without agent entity
_agent_entity = world_state.entities[agent_id]
# Validate based on intent kind
if intent_kind == "Move":
failures.extend(
self._validate_move_constraints(agent_id, payload, world_state)
)
elif intent_kind == "Interact":
failures.extend(
self._validate_interact_constraints(agent_id, payload, world_state)
)
elif intent_kind == "Speak":
failures.extend(
self._validate_speak_constraints(agent_id, payload, world_state)
)
return failures
def _validate_move_constraints(
self, agent_id: str, payload: dict[str, Any], world_state: WorldState
) -> list[str]:
"""Validate movement constraints.
Validates payload structure and movement feasibility for Move intents.
Expected payload: {"to": [x, y, z]} or {"position": [x, y, z]} (legacy)
Args:
agent_id: Agent attempting to move
payload: Move intent payload
world_state: Current world state
Returns:
List of validation failure messages
"""
failures = []
# Check if agent has position
if agent_id not in world_state.spatial_index:
failures.append("agent_has_no_position")
return failures
current_pos = world_state.spatial_index[agent_id]
# Validate target position format (support both "to" and "position" fields)
target = payload.get("to") or payload.get("position")
if not target:
failures.append(
"missing_target_position: expected 'to' or 'position' field"
)
return failures
if not isinstance(target, (list, tuple)):
failures.append("invalid_target_position_type: must be list or tuple")
return failures
if len(target) < 2:
failures.append(
"invalid_target_position_format: must have at least 2 elements"
)
return failures
try:
# Convert to 3D if needed (2D -> add z=0)
if len(target) == 2:
target_pos = (float(target[0]), float(target[1]), 0.0)
elif len(target) >= 3:
target_pos = (float(target[0]), float(target[1]), float(target[2]))
else:
failures.append("invalid_target_position_format")
return failures
except (ValueError, TypeError):
failures.append(
"invalid_target_position_values: coordinates must be numeric"
)
return failures
# Check movement distance (simple constraint)
distance = (
sum((a - b) ** 2 for a, b in zip(current_pos, target_pos, strict=False))
** 0.5
)
max_move_distance = 100.0 # Configurable constraint
if distance > max_move_distance:
failures.append(
f"move_distance_too_large: {distance:.2f} > {max_move_distance}"
)
# Check for collision with other entities (simplified)
for entity_id, entity_pos in world_state.spatial_index.items():
if entity_id == agent_id:
continue
entity_distance = (
sum((a - b) ** 2 for a, b in zip(target_pos, entity_pos, strict=False))
** 0.5
)
min_distance = 1.0 # Minimum distance between entities
if entity_distance < min_distance:
failures.append(f"target_position_occupied: too close to {entity_id}")
break
return failures
def _validate_interact_constraints(
self, agent_id: str, payload: dict[str, Any], world_state: WorldState
) -> list[str]:
"""Validate interaction constraints.
Validates payload structure and interaction feasibility for Interact intents.
Expected payload: {"target_id": str, "interaction_type": str | None, "data": dict | None}
Args:
agent_id: Agent attempting to interact
payload: Interact intent payload
world_state: Current world state
Returns:
List of validation failure messages
"""
failures = []
# Accept both "target_id" and "target" fields for backward compatibility
target_id = payload.get("target_id") or payload.get("target")
if not target_id:
failures.append("missing_target: expected 'target_id' or 'target' field")
return failures
if not isinstance(target_id, str):
failures.append("invalid_target_type: must be string")
return failures
# Check if target exists
if target_id not in world_state.entities:
failures.append(f"interaction_target_not_found: {target_id}")
return failures
# Check if agent and target are close enough (if both have positions)
if (
agent_id in world_state.spatial_index
and target_id in world_state.spatial_index
):
agent_pos = world_state.spatial_index[agent_id]
target_pos = world_state.spatial_index[target_id]
distance = (
sum((a - b) ** 2 for a, b in zip(agent_pos, target_pos, strict=False))
** 0.5
)
max_interact_distance = 5.0 # Configurable constraint
if distance > max_interact_distance:
failures.append(
f"interaction_target_too_far: {distance:.2f} > {max_interact_distance}"
)
# Validate interaction_type if present
interaction_type = payload.get("interaction_type") or payload.get("type")
if interaction_type is not None:
if not isinstance(interaction_type, str):
failures.append("invalid_interaction_type: must be string")
elif interaction_type and interaction_type not in [
"examine",
"use",
"talk",
"trade",
"pickup",
]:
failures.append(f"unknown_interaction_type: {interaction_type}")
# Validate data field if present
data = payload.get("data")
if data is not None and not isinstance(data, dict):
failures.append("invalid_data_type: must be dictionary")
return failures
def _validate_speak_constraints(
self, agent_id: str, payload: dict[str, Any], world_state: WorldState
) -> list[str]:
"""Validate speaking constraints.
Validates payload structure and message content for Speak intents.
Expected payload: {"text": str, "channel": str | None, "target_id": str | None}
Args:
agent_id: Agent attempting to speak
payload: Speak intent payload
world_state: Current world state
Returns:
List of validation failure messages
"""
failures = []
# Accept both "text" and "message" fields for backward compatibility
message = payload.get("text") or payload.get("message")
if message is None:
failures.append("missing_message: expected 'text' or 'message' field")
return failures
if not isinstance(message, str):
failures.append("invalid_message_type: must be string")
return failures
if not message.strip():
failures.append("empty_message: message cannot be empty or whitespace-only")
return failures
# Check message length
max_message_length = 1000 # Configurable constraint
if len(message) > max_message_length:
failures.append(f"message_too_long: {len(message)} > {max_message_length}")
# Validate channel field if present
channel = payload.get("channel")
if channel is not None:
valid_channels = ["public", "team", "private"]
if channel not in valid_channels:
failures.append(f"invalid_channel: must be one of {valid_channels}")
# Private channel requires target_id
if channel == "private" and not payload.get("target_id"):
failures.append("missing_target_id: required for private channel")
# Validate target_id if present
target_id = payload.get("target_id")
if target_id is not None:
if not isinstance(target_id, str) or not target_id.strip():
failures.append("invalid_target_id: must be non-empty string")
elif target_id not in world_state.entities:
failures.append(f"target_not_found: {target_id} does not exist")
# Check for prohibited content (simplified)
prohibited_words = ["spam", "abuse"] # Configurable list
message_lower = message.lower()
for word in prohibited_words:
if word in message_lower:
failures.append(f"prohibited_content: {word}")
return failures
def _estimate_token_usage(self, payload: dict[str, Any]) -> int:
"""Estimate token usage for a payload.
Args:
payload: Intent payload
Returns:
Estimated token count
"""
# Simple estimation: 1 token per 4 characters
import json
try:
payload_str = json.dumps(payload, separators=(",", ":"))
return max(1, len(payload_str) // 4)
except (TypeError, ValueError):
return 10 # Default estimate for non-serializable payloads
def _record_intent_validation(self, intent: Intent, current_time: float) -> None:
"""Record successful intent validation for tracking.
Args:
intent: Validated intent
current_time: Current timestamp
"""
agent_id = intent.get("agent_id", "")
payload = intent.get("payload", {})
# Record intent timestamp
if agent_id not in self._agent_intent_history:
self._agent_intent_history[agent_id] = []
self._agent_intent_history[agent_id].append(current_time)
# Record token usage
estimated_tokens = self._estimate_token_usage(payload)
if agent_id not in self._agent_token_usage:
self._agent_token_usage[agent_id] = []
self._agent_token_usage[agent_id].append((current_time, estimated_tokens))
# Update last intent time
self._agent_last_intent_time[agent_id] = current_time
[ドキュメント]
def get_agent_quota_status(self, agent_id: str) -> dict[str, Any]:
"""Get current quota status for an agent.
Args:
agent_id: Agent identifier
Returns:
Dictionary with quota status information
"""
current_time = time.time()
cutoff_time = current_time - 60.0
# Clean up old entries
intent_history = self._agent_intent_history.get(agent_id, [])
intent_history = [t for t in intent_history if t > cutoff_time]
token_usage = self._agent_token_usage.get(agent_id, [])
token_usage = [(t, tokens) for t, tokens in token_usage if t > cutoff_time]
current_tokens = sum(tokens for _, tokens in token_usage)
last_intent_time = self._agent_last_intent_time.get(agent_id, 0.0)
return {
"agent_id": agent_id,
"intents_used": len(intent_history),
"intents_limit": self.max_intents_per_minute,
"intents_remaining": max(
0, self.max_intents_per_minute - len(intent_history)
),
"tokens_used": current_tokens,
"tokens_limit": self.max_tokens_per_minute,
"tokens_remaining": max(0, self.max_tokens_per_minute - current_tokens),
"last_intent_time": last_intent_time,
"time_since_last_intent": current_time - last_intent_time,
"permissions": list(self._agent_permissions.get(agent_id, set())),
}
[ドキュメント]
class AgentHandle:
"""Per-agent interface for observation and intent submission.
Provides isolated interface for each agent with view sequence tracking
and non-blocking operations.
"""
def __init__(self, agent_id: str, orchestrator: "Orchestrator"):
"""Initialize agent handle.
Args:
agent_id: Unique identifier for this agent
orchestrator: Reference to the orchestrator
"""
self.agent_id = agent_id
self.orchestrator = orchestrator
self.view_seq: int = 0
[ドキュメント]
async def next_observation(self) -> Any:
"""Get next observation delta from orchestrator's timed queue.
Returns:
Next observation delta when available
Raises:
RuntimeError: If agent is not registered or queue is closed
"""
if self.agent_id not in self.orchestrator._per_agent_queues:
raise RuntimeError(f"Agent {self.agent_id} is not registered")
async with async_performance_timer(
"next_observation",
agent_id=self.agent_id,
view_seq=self.view_seq,
logger=self.orchestrator._logger,
tracer_name="gunn.agent_handle",
):
timed_queue = self.orchestrator._per_agent_queues[self.agent_id]
delta = await timed_queue.get()
# Update view sequence from delta
if hasattr(delta, "view_seq"):
self.view_seq = delta.view_seq
elif isinstance(delta, dict) and "view_seq" in delta:
self.view_seq = delta["view_seq"]
# Update view sequence metric
update_view_seq(self.agent_id, self.view_seq)
return delta
[ドキュメント]
async def submit_intent(self, intent: Intent) -> str:
"""Submit intent for validation and processing.
Args:
intent: Intent to submit
Returns:
Request ID for tracking the intent
"""
return await self.orchestrator.submit_intent(intent)
[ドキュメント]
async def cancel(self, req_id: str) -> None:
"""Cancel pending intent by req_id.
Args:
req_id: Request ID to cancel
"""
key = (self.orchestrator.world_id, self.agent_id, req_id)
if key in self.orchestrator._cancel_tokens:
self.orchestrator._cancel_tokens[key].cancel("user_requested")
[ドキュメント]
def get_view_seq(self) -> int:
"""Get current view sequence number.
Returns:
Current view sequence number for this agent
"""
return self.view_seq
[ドキュメント]
class OrchestratorConfig:
"""Configuration for the Orchestrator."""
def __init__(
self,
max_agents: int = 100,
staleness_threshold: int = 0,
debounce_ms: float = 100.0,
deadline_ms: float = 5000.0,
token_budget: int = 1000,
backpressure_policy: str = "defer",
default_priority: int = 0,
dedup_ttl_minutes: int = 60,
max_dedup_entries: int = 10000,
dedup_cleanup_interval_minutes: int = 10,
dedup_warmup_minutes: int = 5,
max_queue_depth: int = 100,
quota_intents_per_minute: int = 60,
quota_tokens_per_minute: int = 10000,
use_in_memory_dedup: bool = False,
processing_idle_shutdown_ms: float = 250.0,
# Memory management settings
max_log_entries: int = 10000,
view_cache_size: int = 1000,
compaction_threshold: int = 5000,
snapshot_interval: int = 1000,
max_snapshots: int = 10,
memory_check_interval_seconds: float = 60.0,
auto_compaction_enabled: bool = True,
# Concurrent processing settings
concurrent_processing_config: ConcurrentProcessingConfig | None = None,
):
"""Initialize orchestrator configuration.
Args:
max_agents: Maximum number of agents
staleness_threshold: Threshold for staleness detection
debounce_ms: Debounce time for interruptions
deadline_ms: Deadline for intent processing
token_budget: Token budget for generation
backpressure_policy: Policy for handling backpressure
default_priority: Default priority when not specified
dedup_ttl_minutes: TTL for deduplication entries
max_dedup_entries: Maximum deduplication entries
dedup_cleanup_interval_minutes: Cleanup interval for dedup store
dedup_warmup_minutes: Warmup period for relaxed deduplication
max_queue_depth: Maximum queue depth per agent
quota_intents_per_minute: Intent quota per agent per minute
quota_tokens_per_minute: Token quota per agent per minute
use_in_memory_dedup: Use in-memory dedup store for testing
processing_idle_shutdown_ms: Idle duration before background intent loop auto-stops (<=0 disables)
max_log_entries: Maximum number of log entries before compaction
view_cache_size: Maximum size of view cache
compaction_threshold: Threshold for triggering log compaction
snapshot_interval: Create snapshot every N events
max_snapshots: Maximum number of snapshots to keep
memory_check_interval_seconds: Interval between memory checks
auto_compaction_enabled: Enable automatic log compaction
"""
self.max_agents = max_agents
self.staleness_threshold = staleness_threshold
self.debounce_ms = debounce_ms
self.deadline_ms = deadline_ms
self.token_budget = token_budget
self.backpressure_policy = backpressure_policy
self.default_priority = default_priority
self.dedup_ttl_minutes = dedup_ttl_minutes
self.max_dedup_entries = max_dedup_entries
self.dedup_cleanup_interval_minutes = dedup_cleanup_interval_minutes
self.dedup_warmup_minutes = dedup_warmup_minutes
self.max_queue_depth = max_queue_depth
self.quota_intents_per_minute = quota_intents_per_minute
self.quota_tokens_per_minute = quota_tokens_per_minute
self.use_in_memory_dedup = use_in_memory_dedup
self.processing_idle_shutdown_ms = processing_idle_shutdown_ms
# Memory management settings
self.max_log_entries = max_log_entries
self.view_cache_size = view_cache_size
self.compaction_threshold = compaction_threshold
self.snapshot_interval = snapshot_interval
self.max_snapshots = max_snapshots
self.memory_check_interval_seconds = memory_check_interval_seconds
self.auto_compaction_enabled = auto_compaction_enabled
# Concurrent processing configuration
self.concurrent_processing_config = (
concurrent_processing_config or ConcurrentProcessingConfig()
)
[ドキュメント]
class Orchestrator:
"""Central coordinator for multi-agent simulation.
Coordinates all system operations including event ingestion and ordering,
view generation and delta distribution, intent validation and effect creation,
and cancellation token management.
Requirements addressed:
- 1.1: Create WorldState as single source of truth
- 1.3: Generate View based on agent's observation policy
- 1.4: Process events using deterministic ordering
- 9.1: Deterministic ordering using (sim_time, priority, source_id, uuid) tuple
- 9.3: Fixed tie-breaker: priority > source > UUID lexicographic
"""
def __init__(
self,
config: OrchestratorConfig,
world_id: str = "default",
effect_validator: EffectValidator | None = None,
):
"""Initialize orchestrator with configuration and dependencies.
Args:
config: Orchestrator configuration
world_id: Identifier for this world instance
effect_validator: Optional custom effect validator
"""
self.world_id = world_id
self.config = config
self.event_log: EventLog = EventLog(world_id)
self.world_state: WorldState = WorldState()
self.observation_policies: dict[str, ObservationPolicy] = {}
self.effect_validator: EffectValidator = (
effect_validator or DefaultEffectValidator()
)
self.agent_handles: dict[str, AgentHandle] = {}
# Effect handler registry for custom effect kinds
self._effect_handlers: dict[str, EffectHandler] = {}
# Initialization tracking for lazy startup
self._initialized: bool = False
self._initialize_lock: asyncio.Lock | None = None
# Two-phase commit infrastructure
if config.use_in_memory_dedup:
self._dedup_store: DedupStore | InMemoryDedupStore = InMemoryDedupStore(
dedup_ttl_minutes=config.dedup_ttl_minutes,
max_entries=config.max_dedup_entries,
warmup_duration_minutes=config.dedup_warmup_minutes,
)
else:
self._dedup_store = DedupStore(
db_path=f"{world_id}_dedup.db",
dedup_ttl_minutes=config.dedup_ttl_minutes,
max_entries=config.max_dedup_entries,
cleanup_interval_minutes=config.dedup_cleanup_interval_minutes,
warmup_duration_minutes=config.dedup_warmup_minutes,
)
# Intent processing pipeline
self._scheduler = WeightedRoundRobinScheduler(
max_queue_depth=config.max_queue_depth,
priority_levels=3, # 0=high, 1=normal, 2=low
)
# Backpressure management
self._backpressure_manager = backpressure_manager
self._agent_backpressure_policies: dict[
str, str
] = {} # agent_id -> policy_name
# Quota tracking
self._quota_tracker: dict[
str, dict[str, list[float]]
] = {} # agent_id -> {intents: timestamps, tokens: timestamps}
# Internal state
self._global_seq: int = 0
self._cancel_tokens: dict[
tuple[str, str, str], CancelToken
] = {} # (world_id, agent_id, req_id)
self._per_agent_queues: dict[str, TimedQueue] = {} # Timed delivery queues
self._sim_time_authority: str = "none" # Which adapter controls sim_time
self._processing_task: asyncio.Task[None] | None = None
self._system_monitoring_task: asyncio.Task[None] | None = None
self._shutdown_event = asyncio.Event()
# Cancellation and staleness detection enhancements
self._last_cancellation_time: dict[str, float] = {} # agent_id -> timestamp
self._agent_interrupt_policies: dict[
str, str
] = {} # agent_id -> policy ("always" | "only_conflict")
# Memory management
memory_config = MemoryConfig(
max_log_entries=config.max_log_entries,
view_cache_size=config.view_cache_size,
compaction_threshold=config.compaction_threshold,
snapshot_interval=config.snapshot_interval,
max_snapshots=config.max_snapshots,
memory_check_interval_seconds=config.memory_check_interval_seconds,
auto_compaction_enabled=config.auto_compaction_enabled,
)
self.memory_manager = MemoryManager(memory_config)
# Concurrent processing
self._concurrent_processor = ConcurrentIntentProcessor(
config.concurrent_processing_config
)
self._concurrent_processor.set_intent_processor(
self._process_single_intent_for_concurrent
)
# Logging and telemetry
self._logger = get_logger("gunn.orchestrator", world_id=world_id)
self._tracer = get_tracer("gunn.orchestrator")
# Start system monitoring
self._start_system_monitoring()
self._logger.info(
"Orchestrator initialized",
world_id=world_id,
max_agents=config.max_agents,
sim_time_authority=self._sim_time_authority,
dedup_store_type=type(self._dedup_store).__name__,
)
[ドキュメント]
async def initialize(self) -> None:
"""Initialize orchestrator and start processing pipeline if needed."""
await self._ensure_initialized()
self._start_processing_loop()
async def _ensure_initialized(self) -> None:
"""Lazily initialize orchestrator internals exactly once."""
if self._initialized:
return
if self._initialize_lock is None:
self._initialize_lock = asyncio.Lock()
async with self._initialize_lock:
if self._initialized:
return # type: ignore
# Initialize deduplication store and reset shutdown state
await self._dedup_store.initialize()
self._shutdown_event.clear()
# Start system monitoring task
if hasattr(self, "_monitor_system_func"):
self._system_monitoring_task = asyncio.create_task(
self._monitor_system_func()
)
self._initialized = True
self._logger.info("Orchestrator dependencies initialized")
def _start_processing_loop(self) -> None:
"""Ensure background intent processing task is running."""
if not self._initialized:
return
if self._processing_task and not self._processing_task.done():
return
self._processing_task = asyncio.create_task(self._intent_processing_loop())
[ドキュメント]
def set_sim_time_authority(
self, authority: Literal["unity", "unreal", "none"]
) -> None:
"""Set which adapter controls sim_time.
Args:
authority: Authority for sim_time ("unity", "unreal", or "none")
"""
self._sim_time_authority = authority
self._logger.info("Sim time authority changed", authority=authority)
[ドキュメント]
async def register_agent(
self,
agent_id: str,
policy: ObservationPolicy,
permissions: set[str] | None = None,
) -> AgentHandle:
"""Register a new agent with observation policy and permissions.
Args:
agent_id: Unique identifier for the agent
policy: Observation policy for this agent
permissions: Set of permissions for the agent. If None, uses default permissions.
Returns:
AgentHandle for the registered agent
Raises:
ValueError: If agent_id is empty or already registered
RuntimeError: If maximum agents exceeded
"""
if not agent_id.strip():
raise ValueError("Agent ID cannot be empty")
if agent_id in self.agent_handles:
raise ValueError(f"Agent {agent_id} is already registered")
if len(self.agent_handles) >= self.config.max_agents:
raise RuntimeError(f"Maximum agents ({self.config.max_agents}) exceeded")
# Create agent handle
handle = AgentHandle(agent_id, self)
# Store policy and handle
self.observation_policies[agent_id] = policy
self.agent_handles[agent_id] = handle
# Create timed queue for this agent
self._per_agent_queues[agent_id] = TimedQueue()
# Set agent permissions (use defaults if not provided)
if permissions is None:
# Default permissions for newly registered agents
permissions = {
"submit_intent",
"intent:move",
"intent:speak",
"intent:interact",
"intent:custom",
}
self.effect_validator.set_agent_permissions(agent_id, permissions)
# Update active agents count metric
update_active_agents_count(len(self.agent_handles))
self._logger.info(
"Agent registered",
agent_id=agent_id,
total_agents=len(self.agent_handles),
policy_type=type(policy).__name__,
permissions=list(permissions),
)
return handle
[ドキュメント]
def register_effect_handler(self, effect_kind: str, handler: EffectHandler) -> None:
"""Register a custom effect handler for a specific effect kind.
This enables domain-specific effect types beyond the built-in kinds
(Move, Speak, Interact, EntityCreated, EntityRemoved, MessageEmitted).
Args:
effect_kind: The effect kind string to handle (e.g., "Attack", "Heal")
handler: Async function that takes (effect, world_state) and applies the effect
Example:
```python
async def handle_attack(effect: Effect, world_state: WorldState) -> None:
attacker_id = effect["source_id"]
target_id = effect["payload"]["target_id"]
damage = effect["payload"]["damage"]
# Apply damage logic to world_state
...
orchestrator.register_effect_handler("Attack", handle_attack)
```
Notes:
- Handlers are called after built-in effect kinds are checked
- Handlers should update world_state in-place
- Handlers should raise exceptions on failure (will be logged)
- Multiple handlers can be registered for different effect kinds
"""
if not effect_kind:
raise ValueError("effect_kind cannot be empty")
if not callable(handler):
raise ValueError("handler must be callable")
self._effect_handlers[effect_kind] = handler
self._logger.info(
"Custom effect handler registered",
effect_kind=effect_kind,
handler_name=handler.__name__
if hasattr(handler, "__name__")
else str(handler),
)
[ドキュメント]
def unregister_effect_handler(self, effect_kind: str) -> None:
"""Unregister a custom effect handler.
Args:
effect_kind: The effect kind to unregister
Raises:
KeyError: If no handler is registered for this effect kind
"""
if effect_kind not in self._effect_handlers:
raise KeyError(f"No handler registered for effect kind: {effect_kind}")
del self._effect_handlers[effect_kind]
self._logger.info(
"Custom effect handler unregistered",
effect_kind=effect_kind,
)
[ドキュメント]
def get_registered_effect_kinds(self) -> list[str]:
"""Get list of all registered custom effect kinds.
Returns:
List of effect kind strings that have registered handlers
"""
return list(self._effect_handlers.keys())
[ドキュメント]
def set_agent_backpressure_policy(self, agent_id: str, policy_name: str) -> None:
"""Set backpressure policy for a specific agent.
Args:
agent_id: Agent identifier
policy_name: Backpressure policy name (defer, shed_oldest, drop_newest)
Raises:
ValueError: If agent_id is not registered or policy_name is invalid
Requirements addressed:
- 10.2: Configurable backpressure policies per agent class
"""
if agent_id not in self.agent_handles:
raise ValueError(f"Agent {agent_id} is not registered")
if policy_name not in self._backpressure_manager.available_policies:
available = ", ".join(self._backpressure_manager.available_policies)
raise ValueError(
f"Invalid backpressure policy '{policy_name}'. Available: {available}"
)
self._agent_backpressure_policies[agent_id] = policy_name
self._logger.info(
"Agent backpressure policy updated",
agent_id=agent_id,
policy=policy_name,
)
[ドキュメント]
async def broadcast_event(self, draft: EffectDraft) -> None:
"""Create complete effect from draft and distribute observations.
Converts EffectDraft to complete Effect with priority completion,
updates world state, generates observation deltas for affected agents,
and delivers observations using timed queues with latency models.
Args:
draft: Effect draft to process and broadcast
Raises:
ValueError: If draft is missing required fields
Requirements addressed:
- 2.2: Generate ObservationDelta patches for affected agents
- 2.5: Incremental updates via ObservationDelta patches
- 6.4: ObservationDelta delivery latency ≤ 20ms
- 6.5: Timed delivery using per-agent TimedQueues with latency models
"""
# Enhanced telemetry for broadcast_event
async with async_performance_timer(
"broadcast_event",
agent_id=draft.get("source_id"),
global_seq=self._global_seq + 1,
logger=self._logger,
tracer_name="gunn.orchestrator",
):
# Validate draft
if not draft.get("kind"):
raise ValueError("EffectDraft must have 'kind' field")
if not draft.get("source_id"):
raise ValueError("EffectDraft must have 'source_id' field")
if not draft.get("schema_version"):
raise ValueError("EffectDraft must have 'schema_version' field")
# Priority completion: use config.default_priority if not specified
payload = draft.get("payload", {})
if "priority" not in payload:
payload = payload.copy()
payload["priority"] = self.config.default_priority
# Create complete effect with orchestrator-managed fields
effect: Effect = {
"uuid": uuid.uuid4().hex,
"kind": draft["kind"],
"payload": payload,
"source_id": draft["source_id"],
"schema_version": draft["schema_version"],
"sim_time": self._current_sim_time(),
"global_seq": self._next_seq(),
}
# Append to event log with world_id in source_metadata
await self.event_log.append(effect, req_id=None)
# Update world state based on effect
await self._apply_effect_to_world_state(effect)
# Check if snapshot should be created and create it if needed
await self.memory_manager.check_and_create_snapshot(
effect["global_seq"], self.world_state, effect["sim_time"]
)
# Check memory limits and trigger compaction if needed
await self.memory_manager.check_memory_limits(self.event_log)
# Generate and distribute observations to affected agents
await self._distribute_observations(effect)
[ドキュメント]
async def submit_intent(self, intent: Intent) -> str:
"""Two-phase commit: idempotency → quota → backpressure → priority → fairness → validator → commit.
Args:
intent: Intent to process
Returns:
Request ID for tracking
Raises:
ValueError: If intent is invalid
StaleContextError: If intent context is stale
QuotaExceededError: If agent quota is exceeded
BackpressureError: If backpressure limits are exceeded
ValidationError: If intent validation fails
"""
# Validate intent structure
if not intent.get("req_id"):
raise ValueError("Intent must have 'req_id' field")
if not intent.get("agent_id"):
raise ValueError("Intent must have 'agent_id' field")
if not intent.get("kind"):
raise ValueError("Intent must have 'kind' field")
await self._ensure_initialized()
req_id = intent["req_id"]
agent_id = intent["agent_id"]
context_seq = intent.get("context_seq", 0)
# Enhanced telemetry for submit_intent
async with async_performance_timer(
"submit_intent",
agent_id=agent_id,
req_id=req_id,
global_seq=self._global_seq,
view_seq=context_seq,
logger=self._logger,
tracer_name="gunn.orchestrator",
):
try:
# Phase 1: Idempotency check using persistent store
existing_seq = await self._dedup_store.check_and_record(
self.world_id, agent_id, req_id, self._global_seq + 1
)
if existing_seq is not None:
self._logger.info(
"Intent already processed (idempotent)",
req_id=req_id,
agent_id=agent_id,
existing_seq=existing_seq,
)
return req_id
# Phase 2: Agent validation
if agent_id not in self.agent_handles:
raise ValueError(f"Agent {agent_id} is not registered")
# Phase 3: Staleness detection
_ = self.agent_handles[agent_id]
staleness = self._global_seq - context_seq
if staleness > self.config.staleness_threshold:
# Record staleness conflict
record_conflict(agent_id, "staleness")
raise StaleContextError(
req_id,
context_seq,
self._global_seq,
self.config.staleness_threshold,
)
# Phase 4: Quota checking
await self._check_quota(agent_id, intent)
# Phase 5: Backpressure checking
await self._check_backpressure(agent_id)
# Phase 6: Priority assignment and fairness scheduling
priority = intent.get("priority", self.config.default_priority)
# Normalize priority to 0-2 range (0=high, 1=normal, 2=low)
normalized_priority = max(0, min(2, 2 - priority // 10))
# Enqueue for fair processing
self._scheduler.enqueue(intent, normalized_priority)
self._logger.info(
"Intent enqueued for processing",
req_id=req_id,
agent_id=agent_id,
priority=priority,
normalized_priority=normalized_priority,
queue_depth=self._scheduler.get_queue_depth(agent_id),
)
self._start_processing_loop()
# Record successful intent throughput
record_intent_throughput(agent_id, intent["kind"], "success")
return req_id
except StaleContextError:
record_intent_throughput(agent_id, intent["kind"], "stale")
raise
except QuotaExceededError:
record_intent_throughput(agent_id, intent["kind"], "quota_exceeded")
record_conflict(agent_id, "quota")
raise
except BackpressureError:
record_intent_throughput(agent_id, intent["kind"], "backpressure")
record_conflict(agent_id, "backpressure")
raise
except ValidationError:
record_intent_throughput(agent_id, intent["kind"], "validation_failed")
record_conflict(agent_id, "validation")
raise
except Exception:
record_intent_throughput(agent_id, intent["kind"], "error")
raise
[ドキュメント]
async def submit_intents(
self, intents: list[Intent], sim_time: float | None = None
) -> list[str]:
"""Submit multiple intents for simultaneous processing at the same sim_time.
This method enables true simultaneous intent execution within a single tick,
maintaining deterministic ordering through (sim_time, priority, source_id, uuid).
Args:
intents: List of intents to process simultaneously
sim_time: Optional explicit sim_time for all intents (default: current time)
Returns:
List of request IDs for tracking
Raises:
ValueError: If intents list is empty or contains invalid intents
StaleContextError: If any intent context is stale
QuotaExceededError: If any agent quota is exceeded
BackpressureError: If backpressure limits are exceeded
ValidationError: If any intent validation fails
Notes:
- All intents are validated before any are enqueued
- If any validation fails, none are processed (atomic batch)
- Intents are ordered by priority within the same sim_time
- Use this for "speak + move" or other simultaneous actions
"""
if not intents:
raise ValueError("intents list cannot be empty")
await self._ensure_initialized()
# Use explicit sim_time or current orchestrator time
effective_sim_time = (
sim_time if sim_time is not None else self._get_current_sim_time()
)
req_ids: list[str] = []
validated_intents: list[
tuple[Intent, int]
] = [] # (intent, normalized_priority)
# Phase 1: Validate all intents atomically
for intent in intents:
if not intent.get("req_id"):
raise ValueError("All intents must have 'req_id' field")
if not intent.get("agent_id"):
raise ValueError("All intents must have 'agent_id' field")
if not intent.get("kind"):
raise ValueError("All intents must have 'kind' field")
req_id = intent["req_id"]
agent_id = intent["agent_id"]
context_seq = intent.get("context_seq", 0)
# Check idempotency
existing_seq = await self._dedup_store.check_and_record(
self.world_id, agent_id, req_id, self._global_seq + 1
)
if existing_seq is not None:
self._logger.info(
"Intent already processed (idempotent)",
req_id=req_id,
agent_id=agent_id,
existing_seq=existing_seq,
)
req_ids.append(req_id)
continue
# Validate agent registration
if agent_id not in self.agent_handles:
raise ValueError(f"Agent {agent_id} is not registered")
# Check staleness
staleness = self._global_seq - context_seq
if staleness > self.config.staleness_threshold:
record_conflict(agent_id, "staleness")
raise StaleContextError(
req_id,
context_seq,
self._global_seq,
self.config.staleness_threshold,
)
# Check quota
await self._check_quota(agent_id, intent)
# Check backpressure
await self._check_backpressure(agent_id)
# Normalize priority
priority = intent.get("priority", self.config.default_priority)
normalized_priority = max(0, min(2, 2 - priority // 10))
validated_intents.append((intent, normalized_priority))
req_ids.append(req_id)
# Phase 2: Enqueue all validated intents
for intent, normalized_priority in validated_intents:
self._scheduler.enqueue(intent, normalized_priority)
self._logger.info(
"Intent enqueued for simultaneous processing",
req_id=intent["req_id"],
agent_id=intent["agent_id"],
priority=intent.get("priority", self.config.default_priority),
normalized_priority=normalized_priority,
sim_time=effective_sim_time,
batch_size=len(intents),
)
record_intent_throughput(intent["agent_id"], intent["kind"], "success")
# Start processing if not already running
self._start_processing_loop()
return req_ids
[ドキュメント]
async def submit_intents_batch(
self,
intents: list[Intent],
sim_time: float | None = None,
processing_mode: ProcessingMode | None = None,
timeout: float | None = None,
) -> BatchResult:
"""Submit multiple intents using concurrent processing capabilities.
This method provides enhanced batch processing with configurable concurrency
modes for improved performance in multi-agent scenarios.
Args:
intents: List of intents to process
sim_time: Optional explicit sim_time for all intents
processing_mode: Processing mode (sequential, concurrent, deterministic_concurrent)
timeout: Timeout for concurrent operations
Returns:
BatchResult containing effects, errors, and processing metadata
Raises:
ValueError: If intents list is empty or contains invalid intents
asyncio.TimeoutError: If processing exceeds timeout
"""
if not intents:
return BatchResult(
effects=[],
errors={},
processing_time=0.0,
metadata={"mode": "empty", "intent_count": 0},
)
await self._ensure_initialized()
# Set sim_time for all intents if provided
if sim_time is not None:
for intent in intents:
intent["sim_time"] = sim_time
# Use concurrent processor for batch processing
result = await self._concurrent_processor.process_batch(
intents=intents, mode=processing_mode, timeout=timeout
)
# Start processing loop to handle any queued intents
self._start_processing_loop()
# Log batch processing results
self._logger.info(
"Batch intent processing completed",
intent_count=len(intents),
effect_count=len(result.effects),
error_count=len(result.errors),
processing_time=result.processing_time,
mode=result.metadata.get("mode", "unknown"),
)
return result
def _get_current_sim_time(self) -> float:
"""Get current simulation time from world state or system time."""
return self.world_state.metadata.get("last_effect_time", 0.0)
async def _check_quota(self, agent_id: str, intent: Intent) -> None:
"""Check if agent has quota available for this intent.
Args:
agent_id: Agent identifier
intent: Intent to check quota for
Raises:
QuotaExceededError: If quota is exceeded
"""
current_time = time.time()
# Initialize quota tracking for agent if needed
if agent_id not in self._quota_tracker:
self._quota_tracker[agent_id] = {
"intents": [],
"tokens": [],
}
agent_quota = self._quota_tracker[agent_id]
# Clean up old quota entries (older than 1 minute)
cutoff_time = current_time - 60.0
agent_quota["intents"] = [t for t in agent_quota["intents"] if t > cutoff_time]
agent_quota["tokens"] = [t for t in agent_quota["tokens"] if t > cutoff_time]
# Check intent quota
if len(agent_quota["intents"]) >= self.config.quota_intents_per_minute:
raise QuotaExceededError(
agent_id,
"intents_per_minute",
self.config.quota_intents_per_minute,
len(agent_quota["intents"]),
)
# Estimate token usage (simplified)
estimated_tokens = len(str(intent.get("payload", {}))) // 4 # Rough estimate
if (
len(agent_quota["tokens"]) + estimated_tokens
> self.config.quota_tokens_per_minute
):
raise QuotaExceededError(
agent_id,
"tokens_per_minute",
self.config.quota_tokens_per_minute,
len(agent_quota["tokens"]) + estimated_tokens,
)
# Record quota usage
agent_quota["intents"].append(current_time)
for _ in range(estimated_tokens):
agent_quota["tokens"].append(current_time)
async def _check_backpressure(self, agent_id: str) -> None:
"""Check backpressure limits for agent.
Args:
agent_id: Agent identifier
Raises:
BackpressureError: If backpressure limits are exceeded
"""
agent_queue_depth = self._scheduler.get_queue_depth(agent_id)
# Get agent-specific backpressure policy or use default
policy_name = self._agent_backpressure_policies.get(
agent_id, self.config.backpressure_policy
)
# Check per-agent queue depth
if agent_queue_depth >= self.config.max_queue_depth:
# Record high watermark and backpressure event
record_queue_high_watermark(agent_id, "agent_queue", agent_queue_depth)
record_backpressure_event(agent_id, "agent_queue", policy_name)
raise BackpressureError(
agent_id,
"agent_queue",
agent_queue_depth,
self.config.max_queue_depth,
policy_name,
)
# Check total system queue depth
total_depth = self._scheduler.get_queue_depth()
system_threshold = self.config.max_queue_depth * self.config.max_agents // 2
if total_depth >= system_threshold:
# Record high watermark and backpressure event
record_queue_high_watermark(agent_id, "system_queue", total_depth)
record_backpressure_event(agent_id, "system_queue", policy_name)
raise BackpressureError(
agent_id,
"system_queue",
total_depth,
system_threshold,
policy_name,
)
[ドキュメント]
def issue_cancel_token(
self, agent_id: str, req_id: str, context_digest: str | None = None
) -> CancelToken:
"""Issue cancellation token for generation tracking.
Creates a new cancellation token with tuple key tracking for the given
agent and request. The token can be used to cancel long-running operations
like LLM generation when context becomes stale.
Args:
agent_id: Agent identifier
req_id: Request identifier
context_digest: Optional context digest for staleness detection
Returns:
CancelToken for tracking cancellation
Raises:
ValueError: If agent_id or req_id is empty
Requirements addressed:
- 4.1: Issue cancel_token with current context_digest
"""
if not agent_id.strip():
raise ValueError("agent_id cannot be empty")
if not req_id.strip():
raise ValueError("req_id cannot be empty")
# Create token with context information
token = CancelToken(req_id, agent_id)
key = (self.world_id, agent_id, req_id)
# Clean up any existing token for this key
if key in self._cancel_tokens:
old_token = self._cancel_tokens[key]
if not old_token.cancelled:
old_token.cancel("replaced_by_new_token")
self._cancel_tokens[key] = token
self._logger.info(
"Cancel token issued",
agent_id=agent_id,
req_id=req_id,
context_digest=context_digest,
total_active_tokens=len(
[t for t in self._cancel_tokens.values() if not t.cancelled]
),
)
return token
[ドキュメント]
async def cancel_if_stale(
self, agent_id: str, req_id: str, new_view_seq: int
) -> bool:
"""Check staleness and cancel if needed with debounce logic.
Evaluates whether the context has become stale based on the staleness
threshold and applies debounce logic to prevent rapid successive
interruptions.
Args:
agent_id: Agent identifier
req_id: Request identifier
new_view_seq: New view sequence number to check against
Returns:
True if cancellation occurred
Requirements addressed:
- 4.2: Evaluate staleness using latest_view_seq > context_seq + staleness_threshold
- 4.3: Cancel current generation when context becomes stale
- 4.7: Suppress rapid successive interruptions within debounce window
"""
key = (self.world_id, agent_id, req_id)
if key not in self._cancel_tokens:
return False
token = self._cancel_tokens[key]
if token.cancelled:
return True
# Get agent's current view sequence
agent_handle = self.agent_handles.get(agent_id)
if not agent_handle:
return False
# Check debounce logic to prevent rapid successive interruptions
current_time = time.time()
last_cancellation = self._last_cancellation_time.get(agent_id, 0)
debounce_window_s = self.config.debounce_ms / 1000.0
if current_time - last_cancellation < debounce_window_s:
self._logger.debug(
"Cancellation suppressed by debounce",
agent_id=agent_id,
req_id=req_id,
time_since_last_ms=(current_time - last_cancellation) * 1000,
debounce_ms=self.config.debounce_ms,
)
return False
# Check staleness threshold
staleness = new_view_seq - agent_handle.view_seq
if staleness > self.config.staleness_threshold:
# Record cancellation time for debounce
self._last_cancellation_time[agent_id] = current_time
# Cancel with detailed reason
reason = f"stale_due_to_seq_gap_{staleness}_threshold_{self.config.staleness_threshold}"
token.cancel(reason)
self._logger.info(
"Intent cancelled due to staleness",
agent_id=agent_id,
req_id=req_id,
staleness=staleness,
threshold=self.config.staleness_threshold,
new_view_seq=new_view_seq,
agent_view_seq=agent_handle.view_seq,
reason=reason,
)
return True
return False
[ドキュメント]
def set_agent_interrupt_policy(self, agent_id: str, policy: str) -> None:
"""Set interrupt policy for an agent.
Args:
agent_id: Agent identifier
policy: Interrupt policy ("always" or "only_conflict")
Raises:
ValueError: If policy is not valid
Requirements addressed:
- 4.5: interrupt_on_new_info policy "always" triggers on any new information
- 4.6: interrupt_on_new_info policy "only_conflict" triggers only on conflicts
"""
if policy not in ("always", "only_conflict"):
raise ValueError(
f"Invalid interrupt policy: {policy}. Must be 'always' or 'only_conflict'"
)
self._agent_interrupt_policies[agent_id] = policy
self._logger.info(
"Agent interrupt policy set",
agent_id=agent_id,
policy=policy,
)
[ドキュメント]
def get_agent_interrupt_policy(self, agent_id: str) -> str:
"""Get interrupt policy for an agent.
Args:
agent_id: Agent identifier
Returns:
Interrupt policy ("always" or "only_conflict"), defaults to "always"
"""
return self._agent_interrupt_policies.get(agent_id, "always")
[ドキュメント]
async def check_and_cancel_stale_tokens(self, effect: Effect) -> list[str]:
"""Check all active tokens for staleness and cancel if needed.
This method is called during observation distribution to automatically
cancel tokens when new events make their context stale.
Args:
effect: New effect that might make contexts stale
Returns:
List of request IDs that were cancelled
Requirements addressed:
- 4.2: Evaluate staleness when new events occur during generation
- 4.3: Cancel current generation when context becomes stale
- 4.4: Provide cancellation reason to the agent
"""
cancelled_req_ids = []
current_global_seq = effect["global_seq"]
# Check all active cancel tokens
for key, token in list(self._cancel_tokens.items()):
if token.cancelled:
continue
world_id, agent_id, req_id = key
if world_id != self.world_id:
continue
# Get agent's interrupt policy
interrupt_policy = self.get_agent_interrupt_policy(agent_id)
# Check if this effect should trigger interruption
should_interrupt = False
if interrupt_policy == "always":
# Any new information triggers interruption
should_interrupt = True
elif interrupt_policy == "only_conflict":
# Only conflicting information triggers interruption
# For now, we'll consider any effect from a different agent as potentially conflicting
# This can be enhanced with more sophisticated conflict detection
should_interrupt = effect["source_id"] != agent_id
if should_interrupt:
# Use the enhanced cancel_if_stale method
# The new_view_seq should be the current global_seq since that's what the agent will see
was_cancelled = await self.cancel_if_stale(
agent_id, req_id, current_global_seq
)
if was_cancelled:
cancelled_req_ids.append(req_id)
if cancelled_req_ids:
self._logger.info(
"Automatic cancellation due to new events",
effect_kind=effect["kind"],
effect_source=effect["source_id"],
global_seq=current_global_seq,
cancelled_count=len(cancelled_req_ids),
cancelled_req_ids=cancelled_req_ids,
)
return cancelled_req_ids
[ドキュメント]
def cleanup_cancelled_tokens(self) -> int:
"""Clean up cancelled tokens to prevent memory leaks.
Returns:
Number of tokens cleaned up
"""
initial_count = len(self._cancel_tokens)
# Remove cancelled tokens
self._cancel_tokens = {
key: token
for key, token in self._cancel_tokens.items()
if not token.cancelled
}
cleaned_count = initial_count - len(self._cancel_tokens)
if cleaned_count > 0:
self._logger.debug(
"Cleaned up cancelled tokens",
cleaned_count=cleaned_count,
remaining_count=len(self._cancel_tokens),
)
return cleaned_count
[ドキュメント]
def get_active_cancel_tokens(self) -> dict[str, list[str]]:
"""Get active cancel tokens grouped by agent.
Returns:
Dictionary mapping agent_id to list of active req_ids
"""
active_tokens: dict[str, list[str]] = {}
for key, token in self._cancel_tokens.items():
if not token.cancelled:
world_id, agent_id, req_id = key
if world_id == self.world_id:
if agent_id not in active_tokens:
active_tokens[agent_id] = []
active_tokens[agent_id].append(req_id)
return active_tokens
def _next_seq(self) -> int:
"""Get next global sequence number.
Returns:
Next monotonically increasing sequence number
"""
self._global_seq += 1
# Update global sequence metric
update_global_seq(self._global_seq)
return self._global_seq
def _current_sim_time(self) -> float:
"""Get current simulation time based on authority.
Returns:
Current simulation time in seconds
"""
if self._sim_time_authority == "none":
# Use monotonic clock when no external authority
return MonotonicClock.now()
else:
# TODO: Get time from external authority (Unity/Unreal)
# For now, fall back to monotonic clock
return MonotonicClock.now()
[ドキュメント]
def get_agent_count(self) -> int:
"""Get number of registered agents.
Returns:
Number of currently registered agents
"""
return len(self.agent_handles)
[ドキュメント]
def get_latest_seq(self) -> int:
"""Get latest global sequence number.
Returns:
Latest global sequence number
"""
return self._global_seq
[ドキュメント]
def get_world_state(self) -> WorldState:
"""Get current world state.
Returns:
Current world state (read-only)
"""
return self.world_state
async def _intent_processing_loop(self) -> None:
"""Background loop for processing intents from the scheduler."""
self._logger.info("Intent processing loop started")
last_cleanup_time = time.time()
cleanup_interval_s = 30.0 # Clean up cancelled tokens every 30 seconds
try:
_ = time.perf_counter()
while not self._shutdown_event.is_set():
try:
# Periodic cleanup of cancelled tokens
current_time = time.time()
if current_time - last_cleanup_time >= cleanup_interval_s:
cleaned_count = self.cleanup_cancelled_tokens()
last_cleanup_time = current_time
if cleaned_count > 0:
self._logger.debug(
"Periodic token cleanup completed",
cleaned_count=cleaned_count,
remaining_tokens=len(self._cancel_tokens),
)
# Batch dequeue multiple intents for parallel processing
intents_to_process = []
max_batch_size = 5 # Process up to 5 intents in parallel
# Collect available intents up to batch size
for _ in range(max_batch_size):
intent = self._scheduler.dequeue()
if intent is None:
break
intents_to_process.append(intent)
if not intents_to_process:
# No intents to process, wait a bit (reduced from 10ms to 1ms)
await asyncio.sleep(
0.001
) # 1ms - significantly reduces idle time
continue
# Process intents in parallel
tasks = [
asyncio.create_task(self._process_intent(intent))
for intent in intents_to_process
]
# Wait for all tasks to complete
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
self._logger.error(
"Error in intent processing loop",
error=str(e),
error_type=type(e).__name__,
)
# Continue processing other intents
await asyncio.sleep(0.1)
except asyncio.CancelledError:
self._logger.info("Intent processing loop cancelled")
raise
except Exception as e:
self._logger.error(
"Intent processing loop failed",
error=str(e),
error_type=type(e).__name__,
)
finally:
self._processing_task = None
if not self._shutdown_event.is_set():
# Auto-stop triggered by idle timeout: close dedup store to avoid hanging tasks
try:
await self._dedup_store.close()
finally:
self._initialized = False
self._initialize_lock = None
self._logger.info(
"Intent processing loop exited (idle auto-stop)",
idle_timeout_ms=self.config.processing_idle_shutdown_ms,
)
else:
self._logger.info("Intent processing loop exited")
async def _process_intent(self, intent: Intent) -> None:
"""Process a single intent through validation and effect creation.
Args:
intent: Intent to process
"""
req_id = intent["req_id"]
agent_id = intent["agent_id"]
start_time = time.perf_counter()
try:
# Phase 7: Validation
validation_failures = []
if not self.effect_validator.validate_intent(intent, self.world_state):
validation_failures.append("effect_validator_failed")
# Additional validation checks can be added here
if validation_failures:
raise ValidationError(intent, validation_failures)
# Phase 8: Commit - Create effect from validated intent
effect_draft: EffectDraft = {
"kind": intent["kind"],
"payload": intent["payload"],
"source_id": agent_id,
"schema_version": intent.get("schema_version", "1.0.0"),
}
# Broadcast the effect (this will assign the global_seq)
await self.broadcast_event(effect_draft)
processing_time_ms = (time.perf_counter() - start_time) * 1000
self._logger.info(
"Intent processed successfully",
req_id=req_id,
agent_id=agent_id,
intent_kind=intent["kind"],
global_seq=self._global_seq,
processing_time_ms=processing_time_ms,
)
except Exception as e:
processing_time_ms = (time.perf_counter() - start_time) * 1000
self._logger.error(
"Intent processing failed",
req_id=req_id,
agent_id=agent_id,
error=str(e),
error_type=type(e).__name__,
processing_time_ms=processing_time_ms,
)
# Intent processing failure - could implement retry logic here
async def _process_single_intent_for_concurrent(
self, intent: Intent
) -> list[Effect]:
"""Process a single intent and return effects for concurrent processing.
This method is used by the concurrent processor to handle individual intents
and return the generated effects rather than broadcasting them immediately.
Args:
intent: Intent to process
Returns:
List of effects generated from the intent
Raises:
ValidationError: If intent validation fails
Exception: Any other processing error
"""
req_id = intent["req_id"]
agent_id = intent["agent_id"]
# Validate the intent
if not self.effect_validator.validate_intent(intent, self.world_state):
raise ValidationError(intent, ["effect_validator_failed"])
# Create effect from validated intent
effect: Effect = {
"kind": intent["kind"],
"payload": intent["payload"],
"source_id": agent_id,
"schema_version": intent.get("schema_version", "1.0.0"),
"global_seq": self._global_seq
+ 1, # Will be properly assigned when broadcast
"sim_time": intent.get("sim_time", self._get_current_sim_time()),
}
# For concurrent processing, we return the effect rather than broadcasting it
# The concurrent processor will handle broadcasting in the correct order
return [effect]
[ドキュメント]
def get_processing_stats(self) -> dict[str, Any]:
"""Get intent processing statistics.
Returns:
Dictionary with processing statistics
"""
scheduler_stats = self._scheduler.get_stats()
return {
"scheduler": scheduler_stats,
"quota_tracker": {
agent_id: {
"intents_count": len(quotas["intents"]),
"tokens_count": len(quotas["tokens"]),
}
for agent_id, quotas in self._quota_tracker.items()
},
"global_seq": self._global_seq,
"cancel_tokens_active": len(self._cancel_tokens),
"agents_registered": len(self.agent_handles),
}
async def _apply_effect_to_world_state(self, effect: Effect) -> None:
"""Apply effect to world state based on effect kind and payload.
Args:
effect: Effect to apply to world state
"""
effect_kind = effect["kind"]
payload = effect["payload"]
source_id = effect["source_id"]
try:
if effect_kind == "Move":
# Handle unified position format: "to" field (new) or "position" field (legacy)
position = payload.get("to") or payload.get("position")
if position and isinstance(position, (list, tuple)):
# Convert 2D to 3D if needed
if len(position) == 2:
position = [float(position[0]), float(position[1]), 0.0]
elif len(position) >= 3:
position = [
float(position[0]),
float(position[1]),
float(position[2]),
]
else:
position = None
if position:
# Update spatial index
self.world_state.spatial_index[source_id] = tuple(position)
# Update entity data - ensure position is always synchronized
if source_id not in self.world_state.entities:
self.world_state.entities[source_id] = {}
entity_data = self.world_state.entities[source_id]
if isinstance(entity_data, dict):
entity_data.update(
{
"position": position, # Canonical position field
"last_position": position, # Kept for backward compatibility
"last_move_time": effect["sim_time"],
}
)
elif effect_kind == "Speak" or effect_kind == "SpeakResponse":
# Update entity with speaking information
if source_id not in self.world_state.entities:
self.world_state.entities[source_id] = {}
entity_data = self.world_state.entities[source_id]
if isinstance(entity_data, dict):
entity_data.update(
{
"last_message": payload.get("text", ""),
"last_speak_time": effect["sim_time"],
"message_count": entity_data.get("message_count", 0) + 1,
}
)
elif effect_kind == "Interact":
# Handle interaction between entities
target_id = payload.get("target_id")
if target_id:
# Update relationships
if source_id not in self.world_state.relationships:
self.world_state.relationships[source_id] = []
if target_id not in self.world_state.relationships[source_id]:
self.world_state.relationships[source_id].append(target_id)
# Bidirectional relationship
if target_id not in self.world_state.relationships:
self.world_state.relationships[target_id] = []
if source_id not in self.world_state.relationships[target_id]:
self.world_state.relationships[target_id].append(source_id)
elif effect_kind == "EntityCreated":
# Create new entity
entity_id = payload.get("entity_id", source_id)
entity_data = payload.get("entity_data", {})
self.world_state.entities[entity_id] = entity_data
# Set initial position if provided - synchronize both representations
if "position" in payload:
position = payload["position"]
if isinstance(position, list | tuple) and len(position) >= 3:
position_tuple = (
float(position[0]),
float(position[1]),
float(position[2]),
)
self.world_state.spatial_index[entity_id] = position_tuple
# Also set in entity data for consistency
if isinstance(entity_data, dict):
entity_data["position"] = list(position_tuple)
elif effect_kind == "EntityRemoved":
# Remove entity
entity_id = payload.get("entity_id", source_id)
self.world_state.entities.pop(entity_id, None)
self.world_state.spatial_index.pop(entity_id, None)
self.world_state.relationships.pop(entity_id, None)
# Remove from other entities' relationships
for relations in self.world_state.relationships.values():
if entity_id in relations:
relations.remove(entity_id)
elif effect_kind == "MessageEmitted":
# Handle message emission (similar to Speak but more general)
if source_id not in self.world_state.entities:
self.world_state.entities[source_id] = {}
entity_data = self.world_state.entities[source_id]
if isinstance(entity_data, dict):
entity_data.update(
{
"last_message": payload.get(
"text", payload.get("message", "")
),
"last_emit_time": effect["sim_time"],
"emit_count": entity_data.get("emit_count", 0) + 1,
}
)
else:
# Custom effect kinds - delegate to registered handlers
if (
hasattr(self, "_effect_handlers")
and effect_kind in self._effect_handlers
):
handler = self._effect_handlers[effect_kind]
await handler(effect, self.world_state)
# Update world metadata
self.world_state.metadata.update(
{
"last_effect_seq": effect["global_seq"],
"last_effect_time": effect["sim_time"],
"last_effect_kind": effect_kind,
}
)
self._logger.debug(
"Effect applied to world state",
effect_kind=effect_kind,
source_id=source_id,
global_seq=effect["global_seq"],
)
except Exception as e:
self._logger.error(
"Failed to apply effect to world state",
effect_kind=effect_kind,
source_id=source_id,
global_seq=effect["global_seq"],
error=str(e),
error_type=type(e).__name__,
)
# Don't re-raise - world state update failures shouldn't break observation distribution
async def _distribute_observations(self, effect: Effect) -> None:
"""Generate and distribute observation deltas to affected agents.
Also checks for stale cancel tokens and automatically cancels them
when new events occur during generation.
Args:
effect: Effect that occurred and needs to be observed
Requirements addressed:
- 4.2: Evaluate staleness when new events occur during generation
- 4.3: Cancel current generation when context becomes stale
"""
if not self.agent_handles:
return # No agents to notify
distribution_start = time.perf_counter()
agents_notified = 0
# First, check for stale tokens and cancel them automatically
_ = await self.check_and_cancel_stale_tokens(effect)
# Parallel observation generation for better performance
async def process_agent_observation(
agent_id: str, agent_handle: AgentHandle
) -> tuple[str, bool]:
try:
# Check if agent should observe this effect
observation_policy = self.observation_policies[agent_id]
should_observe = observation_policy.should_observe_event(
effect, agent_id, self.world_state
)
if not should_observe:
return agent_id, False
# Generate new view for the agent
new_view = observation_policy.filter_world_state(
self.world_state, agent_id
)
new_view.view_seq = effect["global_seq"]
# Get agent's current view to generate delta
current_view_seq = agent_handle.view_seq
# For the first observation or if we don't have previous view, create a full view
if current_view_seq == 0:
# First observation - create delta with full state
observation_delta = ObservationDelta(
view_seq=new_view.view_seq,
patches=[
{
"op": "replace",
"path": "/visible_entities",
"value": new_view.visible_entities,
},
{
"op": "replace",
"path": "/visible_relationships",
"value": new_view.visible_relationships,
},
],
context_digest=new_view.context_digest,
schema_version="1.0.0",
)
else:
# Generate incremental delta
# For now, create a simplified delta based on the effect
observation_delta = self._create_effect_based_delta(
effect, new_view, agent_id, observation_policy
)
# Calculate delivery delay using latency model
delivery_delay = observation_policy.latency_model.calculate_delay(
effect["source_id"], agent_id, effect
)
# Schedule delivery via timed queue
agent_queue = self._per_agent_queues[agent_id]
loop = asyncio.get_running_loop()
deliver_at = loop.time() + delivery_delay
await agent_queue.put_at(deliver_at, observation_delta)
# Record observation delivery latency and bandwidth
record_observation_delivery_latency(agent_id, delivery_delay)
# Calculate patch size for bandwidth monitoring
import orjson
patch_size = len(orjson.dumps(observation_delta))
patch_count = len(observation_delta.get("patches", []))
bandwidth_monitor.record_patch_bandwidth(
agent_id, patch_size, patch_count, False
)
self._logger.debug(
"Observation scheduled for delivery",
agent_id=agent_id,
effect_kind=effect["kind"],
global_seq=effect["global_seq"],
delivery_delay_ms=delivery_delay * 1000,
view_seq=new_view.view_seq,
)
return agent_id, True
except Exception as e:
self._logger.error(
"Failed to generate observation for agent",
agent_id=agent_id,
effect_kind=effect["kind"],
global_seq=effect["global_seq"],
error=str(e),
error_type=type(e).__name__,
)
return agent_id, False
# Execute all agent observation processing in parallel
tasks = [
asyncio.create_task(process_agent_observation(agent_id, agent_handle))
for agent_id, agent_handle in self.agent_handles.items()
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count successful notifications
agents_notified = sum(
1 for result in results if isinstance(result, tuple) and result[1]
)
distribution_time_ms = (time.perf_counter() - distribution_start) * 1000
self._logger.info(
"Observation distribution complete",
effect_kind=effect["kind"],
global_seq=effect["global_seq"],
agents_notified=agents_notified,
total_agents=len(self.agent_handles),
distribution_time_ms=distribution_time_ms,
)
def _create_effect_based_delta(
self,
effect: Effect,
new_view: Any,
agent_id: str,
observation_policy: ObservationPolicy,
) -> ObservationDelta:
"""Create observation delta based on effect type and content.
Args:
effect: Effect that occurred
new_view: New view for the agent
agent_id: Agent receiving the observation
observation_policy: Agent's observation policy
Returns:
ObservationDelta dictionary with patches
"""
effect_kind = effect["kind"]
payload = effect["payload"]
source_id = effect["source_id"]
patches = []
try:
if effect_kind == "Move":
# Update position in visible entities
if source_id in new_view.visible_entities:
patches.append(
{
"op": "replace",
"path": f"/visible_entities/{source_id}/last_position",
"value": payload.get("position"),
}
)
patches.append(
{
"op": "replace",
"path": f"/visible_entities/{source_id}/last_move_time",
"value": effect["sim_time"],
}
)
elif effect_kind in ["Speak", "SpeakResponse", "MessageEmitted"]:
# Update message in visible entities
if source_id in new_view.visible_entities:
message_field = "last_message"
message_value = payload.get("text", payload.get("message", ""))
patches.append(
{
"op": "replace",
"path": f"/visible_entities/{source_id}/{message_field}",
"value": message_value,
}
)
patches.append(
{
"op": "replace",
"path": f"/visible_entities/{source_id}/last_speak_time",
"value": effect["sim_time"],
}
)
elif effect_kind == "Interact":
# Update relationships if both entities are visible
target_id = payload.get("target_id")
if (
target_id
and source_id in new_view.visible_entities
and target_id in new_view.visible_entities
):
# Add relationship if not already present
if source_id in new_view.visible_relationships:
if target_id not in new_view.visible_relationships[source_id]:
patches.append(
{
"op": "add",
"path": f"/visible_relationships/{source_id}/-",
"value": target_id,
}
)
else:
patches.append(
{
"op": "add",
"path": f"/visible_relationships/{source_id}",
"value": [target_id],
}
)
elif effect_kind == "EntityCreated":
# Add new entity if it should be visible
entity_id = payload.get("entity_id", source_id)
if entity_id in new_view.visible_entities:
patches.append(
{
"op": "add",
"path": f"/visible_entities/{entity_id}",
"value": new_view.visible_entities[entity_id],
}
)
elif effect_kind == "EntityRemoved":
# Remove entity if it was visible
entity_id = payload.get("entity_id", source_id)
patches.append(
{
"op": "remove",
"path": f"/visible_entities/{entity_id}",
}
)
# Also remove from relationships
if entity_id in new_view.visible_relationships:
patches.append(
{
"op": "remove",
"path": f"/visible_relationships/{entity_id}",
}
)
# Check if patches exceed max_patch_ops threshold
if len(patches) > observation_policy.config.max_patch_ops:
# Fallback to full snapshot
patches = [
{
"op": "replace",
"path": "/visible_entities",
"value": new_view.visible_entities,
},
{
"op": "replace",
"path": "/visible_relationships",
"value": new_view.visible_relationships,
},
]
except Exception as e:
self._logger.warning(
"Failed to create effect-based delta, falling back to full snapshot",
effect_kind=effect_kind,
agent_id=agent_id,
error=str(e),
)
# Fallback to full snapshot
patches = [
{
"op": "replace",
"path": "/visible_entities",
"value": new_view.visible_entities,
},
{
"op": "replace",
"path": "/visible_relationships",
"value": new_view.visible_relationships,
},
]
return ObservationDelta(
view_seq=new_view.view_seq,
patches=patches,
context_digest=new_view.context_digest,
schema_version="1.0.0",
)
[ドキュメント]
def get_memory_stats(self) -> dict[str, Any]:
"""Get comprehensive memory management statistics.
Returns:
Dictionary with detailed memory statistics including log entries,
snapshots, view cache, and compaction information.
"""
return self.memory_manager.get_detailed_stats(self.event_log)
[ドキュメント]
async def force_compaction(self) -> int:
"""Force log compaction regardless of thresholds.
Returns:
Number of entries removed during compaction
"""
return await self.memory_manager.compact_log(self.event_log)
[ドキュメント]
async def create_snapshot(self) -> Any:
"""Force creation of a WorldState snapshot.
Returns:
Created snapshot
"""
return await self.memory_manager.snapshot_manager.create_snapshot(
self._global_seq, self.world_state, self._current_sim_time()
)
[ドキュメント]
def evict_old_views(self, max_age_seconds: float = 3600.0) -> int:
"""Evict old cached views to free memory.
Args:
max_age_seconds: Maximum age for cached views
Returns:
Number of views evicted
"""
return self.memory_manager.evict_old_views(max_age_seconds)
[ドキュメント]
async def shutdown(self) -> None:
"""Shutdown orchestrator and clean up resources."""
self._logger.info("Starting orchestrator shutdown")
# Signal shutdown
self._shutdown_event.set()
# Stop processing task
if self._processing_task:
self._processing_task.cancel()
try:
await self._processing_task
except asyncio.CancelledError:
pass
finally:
self._processing_task = None
# Stop system monitoring task
if hasattr(self, "_system_monitoring_task") and self._system_monitoring_task:
self._system_monitoring_task.cancel()
try:
await self._system_monitoring_task
except asyncio.CancelledError:
pass
finally:
self._system_monitoring_task = None
# Close deduplication store
await self._dedup_store.close()
# Close all agent queues
for queue in self._per_agent_queues.values():
await queue.close()
# Cleanup memory manager
await self.memory_manager.cleanup()
# Clear state
self.agent_handles.clear()
self.observation_policies.clear()
self._per_agent_queues.clear()
self._cancel_tokens.clear()
self._quota_tracker.clear()
self._initialized = False
self._initialize_lock = None
self._logger.info("Orchestrator shutdown complete")
def _start_system_monitoring(self) -> None:
"""Start background system monitoring task."""
async def monitor_system() -> None:
while not self._shutdown_event.is_set():
try:
# Record system metrics
system_monitor.record_memory_usage("orchestrator")
system_monitor.record_cpu_usage("orchestrator")
# Record queue depths
for agent_id, queue in self._per_agent_queues.items():
queue_depth = queue.qsize() if hasattr(queue, "qsize") else 0
record_queue_depth(agent_id, queue_depth)
# Wait before next monitoring cycle
await asyncio.sleep(30.0) # Monitor every 30 seconds
except Exception as e:
self._logger.warning("System monitoring error", error=str(e))
await asyncio.sleep(60.0) # Back off on error
# Store monitor_system function for later use in initialize()
self._monitor_system_func = monitor_system