gunn.core package¶
Submodules¶
gunn.core.event_log module¶
Event log implementation for append-only storage with integrity checking.
This module provides the EventLog class that maintains an immutable sequence of effects with hash chain integrity verification and replay capabilities.
- class gunn.core.event_log.EventLogEntry(**data)[ソース]¶
ベースクラス:
BaseModel
Single entry in the event log with integrity checking.
Each entry contains an effect along with metadata for ordering, timing, and hash chain integrity verification.
- パラメータ:
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gunn.core.event_log.EventLog(world_id='default')[ソース]¶
ベースクラス:
object
Append-only event log with hash chain integrity.
Provides thread-safe append operations, replay capabilities, and integrity verification through hash chaining.
Requirements addressed: - 1.2: Record events in global sequential log with global_seq identifier - 7.1: Maintain complete sequential log with global_seq numbers - 7.3: Provide replay capabilities from event log - 7.5: Validate state consistency using log hash/CRC and sequence gap detection
- パラメータ:
world_id (str)
- async append(effect, req_id=None)[ソース]¶
Append effect to log with hash chain checksum.
Thread-safe operation that adds a new effect to the log with proper sequencing and integrity checking.
- Args:
effect: Effect to append to the log req_id: Optional request ID for idempotency tracking
- Returns:
The global_seq assigned to this entry
- Raises:
ValueError: If effect is missing required fields
- get_entries_since(since_seq)[ソース]¶
Get entries for replay and catch-up.
Returns all entries with global_seq > since_seq in order.
- 戻り値の型:
- パラメータ:
since_seq (int)
- Args:
since_seq: Sequence number to start from (exclusive)
- Returns:
List of entries after the specified sequence number
- get_latest_seq()[ソース]¶
Get the latest sequence number in the log.
- 戻り値の型:
- Returns:
Latest global_seq, or 0 if log is empty
- get_entry_count()[ソース]¶
Get total number of entries in the log.
- 戻り値の型:
- Returns:
Number of entries in the log
- validate_integrity()[ソース]¶
Validate complete log integrity.
Performs comprehensive integrity checks including: - Hash chain validation - Sequence gap detection - Corruption analysis
- Returns:
Dictionary with integrity report containing: - valid: Overall validity boolean - corrupted_entries: List of corrupted entry indices - missing_sequences: List of detected sequence gaps - total_entries: Total number of entries checked - details: Additional diagnostic information
- find_entry_by_uuid(effect_uuid)[ソース]¶
Find entry by effect UUID.
- 戻り値の型:
- パラメータ:
effect_uuid (str)
- Args:
effect_uuid: UUID of the effect to find
- Returns:
EventLogEntry if found, None otherwise
- find_entries_by_req_id(req_id)[ソース]¶
Find entries by request ID.
- 戻り値の型:
- パラメータ:
req_id (str)
- Args:
req_id: Request ID to search for
- Returns:
List of entries with matching request ID
- get_entries_by_source(source_id)[ソース]¶
Get all entries from a specific source.
- 戻り値の型:
- パラメータ:
source_id (str)
- Args:
source_id: Source identifier to filter by
- Returns:
List of entries from the specified source
- get_entries_in_time_range(start_time=None, end_time=None, use_sim_time=True)[ソース]¶
Get entries within a time range.
- 戻り値の型:
- パラメータ:
- Args:
start_time: Start time (inclusive), None for no lower bound end_time: End time (inclusive), None for no upper bound use_sim_time: If True, filter by sim_time; if False, use wall_time
- Returns:
List of entries within the specified time range
- async compact(keep_entries=1000)[ソース]¶
Compact the log by removing old entries.
Keeps the most recent entries and removes older ones to manage memory. This operation maintains integrity by preserving the hash chain.
- Args:
keep_entries: Number of recent entries to keep
- Returns:
Number of entries removed
gunn.core.orchestrator module¶
Core orchestrator for multi-agent simulation.
This module provides the central Orchestrator class that coordinates all system operations including event ingestion, view generation, intent validation, and observation distribution with deterministic ordering.
- class gunn.core.orchestrator.EffectValidator(*args, **kwargs)[ソース]¶
ベースクラス:
Protocol
Protocol for validating intents before creating effects.
- validate_intent(intent, world_state)[ソース]¶
Validate if intent can be executed.
- 戻り値の型:
- パラメータ:
intent (Intent)
world_state (WorldState)
- Args:
intent: Intent to validate world_state: Current world state
- Returns:
True if intent is valid and can be executed
- class gunn.core.orchestrator.DefaultEffectValidator(max_intents_per_minute=60, max_tokens_per_minute=10000, default_cooldown_seconds=1.0, max_payload_size_bytes=10000, allowed_intent_kinds=None)[ソース]¶
ベースクラス:
object
Default implementation of EffectValidator with comprehensive validation.
Provides validation for quota limits, cooldowns, permissions, and world state constraints to ensure intents can be safely executed.
Requirements addressed: - 1.5: Intent validation through EffectValidator before creating an Effect - 3.4: Validation for quota limits, cooldowns, and permissions - 10.3: Structured error codes for validation failures
- パラメータ:
- set_agent_permissions(agent_id, permissions)[ソース]¶
Set permissions for an agent.
- Args:
agent_id: Agent identifier permissions: Set of permission strings
- add_agent_permission(agent_id, permission)[ソース]¶
Add a permission for an agent.
- Args:
agent_id: Agent identifier permission: Permission string to add
- remove_agent_permission(agent_id, permission)[ソース]¶
Remove a permission for an agent.
- Args:
agent_id: Agent identifier permission: Permission string to remove
- set_intent_kind_cooldown(intent_kind, cooldown_seconds)[ソース]¶
Set cooldown for a specific intent kind.
- Args:
intent_kind: Intent kind to set cooldown for cooldown_seconds: Cooldown duration in seconds
- validate_intent(intent, world_state)[ソース]¶
Validate if intent can be executed.
Performs comprehensive validation including: - Basic structure validation - Permission checks - Quota limit validation - Cooldown enforcement - World state constraint checks - Payload size limits
- 戻り値の型:
- パラメータ:
intent (Intent)
world_state (WorldState)
- Args:
intent: Intent to validate world_state: Current world state
- Returns:
True if intent is valid and can be executed
- Raises:
ValidationError: If validation fails with detailed reasons
- class gunn.core.orchestrator.AgentHandle(agent_id, orchestrator)[ソース]¶
ベースクラス:
object
Per-agent interface for observation and intent submission.
Provides isolated interface for each agent with view sequence tracking and non-blocking operations.
- パラメータ:
agent_id (str)
orchestrator (Orchestrator)
- async next_observation()[ソース]¶
Get next observation delta from orchestrator's timed queue.
- 戻り値の型:
- Returns:
Next observation delta when available
- Raises:
RuntimeError: If agent is not registered or queue is closed
- async submit_intent(intent)[ソース]¶
Submit intent for validation and processing.
- Args:
intent: Intent to submit
- Returns:
Request ID for tracking the intent
- class gunn.core.orchestrator.OrchestratorConfig(max_agents=100, staleness_threshold=0, debounce_ms=100.0, deadline_ms=5000.0, token_budget=1000, backpressure_policy='defer', default_priority=0, dedup_ttl_minutes=60, max_dedup_entries=10000, dedup_cleanup_interval_minutes=10, dedup_warmup_minutes=5, max_queue_depth=100, quota_intents_per_minute=60, quota_tokens_per_minute=10000, use_in_memory_dedup=False, processing_idle_shutdown_ms=250.0, max_log_entries=10000, view_cache_size=1000, compaction_threshold=5000, snapshot_interval=1000, max_snapshots=10, memory_check_interval_seconds=60.0, auto_compaction_enabled=True, concurrent_processing_config=None)[ソース]¶
ベースクラス:
object
Configuration for the Orchestrator.
- パラメータ:
max_agents (int)
staleness_threshold (int)
debounce_ms (float)
deadline_ms (float)
token_budget (int)
backpressure_policy (str)
default_priority (int)
dedup_ttl_minutes (int)
max_dedup_entries (int)
dedup_cleanup_interval_minutes (int)
dedup_warmup_minutes (int)
max_queue_depth (int)
quota_intents_per_minute (int)
quota_tokens_per_minute (int)
use_in_memory_dedup (bool)
processing_idle_shutdown_ms (float)
max_log_entries (int)
view_cache_size (int)
compaction_threshold (int)
snapshot_interval (int)
max_snapshots (int)
memory_check_interval_seconds (float)
auto_compaction_enabled (bool)
concurrent_processing_config (ConcurrentProcessingConfig | None)
- class gunn.core.orchestrator.Orchestrator(config, world_id='default', effect_validator=None)[ソース]¶
ベースクラス:
object
Central coordinator for multi-agent simulation.
Coordinates all system operations including event ingestion and ordering, view generation and delta distribution, intent validation and effect creation, and cancellation token management.
Requirements addressed: - 1.1: Create WorldState as single source of truth - 1.3: Generate View based on agent's observation policy - 1.4: Process events using deterministic ordering - 9.1: Deterministic ordering using (sim_time, priority, source_id, uuid) tuple - 9.3: Fixed tie-breaker: priority > source > UUID lexicographic
- パラメータ:
config (OrchestratorConfig)
world_id (str)
effect_validator (EffectValidator | None)
- set_sim_time_authority(authority)[ソース]¶
Set which adapter controls sim_time.
- Args:
authority: Authority for sim_time ("unity", "unreal", or "none")
- async register_agent(agent_id, policy, permissions=None)[ソース]¶
Register a new agent with observation policy and permissions.
- 戻り値の型:
- パラメータ:
agent_id (str)
policy (ObservationPolicy)
- Args:
agent_id: Unique identifier for the agent policy: Observation policy for this agent permissions: Set of permissions for the agent. If None, uses default permissions.
- Returns:
AgentHandle for the registered agent
- Raises:
ValueError: If agent_id is empty or already registered RuntimeError: If maximum agents exceeded
- register_effect_handler(effect_kind, handler)[ソース]¶
Register a custom effect handler for a specific effect kind.
This enables domain-specific effect types beyond the built-in kinds (Move, Speak, Interact, EntityCreated, EntityRemoved, MessageEmitted).
- Args:
effect_kind: The effect kind string to handle (e.g., "Attack", "Heal") handler: Async function that takes (effect, world_state) and applies the effect
- Example:
```python async def handle_attack(effect: Effect, world_state: WorldState) -> None:
attacker_id = effect["source_id"] target_id = effect["payload"]["target_id"] damage = effect["payload"]["damage"] # Apply damage logic to world_state ...
orchestrator.register_effect_handler("Attack", handle_attack) ```
- Notes:
Handlers are called after built-in effect kinds are checked
Handlers should update world_state in-place
Handlers should raise exceptions on failure (will be logged)
Multiple handlers can be registered for different effect kinds
- unregister_effect_handler(effect_kind)[ソース]¶
Unregister a custom effect handler.
- Args:
effect_kind: The effect kind to unregister
- Raises:
KeyError: If no handler is registered for this effect kind
- get_registered_effect_kinds()[ソース]¶
Get list of all registered custom effect kinds.
- Returns:
List of effect kind strings that have registered handlers
- set_agent_backpressure_policy(agent_id, policy_name)[ソース]¶
Set backpressure policy for a specific agent.
- Args:
agent_id: Agent identifier policy_name: Backpressure policy name (defer, shed_oldest, drop_newest)
- Raises:
ValueError: If agent_id is not registered or policy_name is invalid
Requirements addressed: - 10.2: Configurable backpressure policies per agent class
- async broadcast_event(draft)[ソース]¶
Create complete effect from draft and distribute observations.
Converts EffectDraft to complete Effect with priority completion, updates world state, generates observation deltas for affected agents, and delivers observations using timed queues with latency models.
- 戻り値の型:
- パラメータ:
draft (EffectDraft)
- Args:
draft: Effect draft to process and broadcast
- Raises:
ValueError: If draft is missing required fields
Requirements addressed: - 2.2: Generate ObservationDelta patches for affected agents - 2.5: Incremental updates via ObservationDelta patches - 6.4: ObservationDelta delivery latency ≤ 20ms - 6.5: Timed delivery using per-agent TimedQueues with latency models
- async submit_intent(intent)[ソース]¶
Two-phase commit: idempotency → quota → backpressure → priority → fairness → validator → commit.
- Args:
intent: Intent to process
- Returns:
Request ID for tracking
- Raises:
ValueError: If intent is invalid StaleContextError: If intent context is stale QuotaExceededError: If agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If intent validation fails
- async submit_intents(intents, sim_time=None)[ソース]¶
Submit multiple intents for simultaneous processing at the same sim_time.
This method enables true simultaneous intent execution within a single tick, maintaining deterministic ordering through (sim_time, priority, source_id, uuid).
- Args:
intents: List of intents to process simultaneously sim_time: Optional explicit sim_time for all intents (default: current time)
- Returns:
List of request IDs for tracking
- Raises:
ValueError: If intents list is empty or contains invalid intents StaleContextError: If any intent context is stale QuotaExceededError: If any agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If any intent validation fails
- Notes:
All intents are validated before any are enqueued
If any validation fails, none are processed (atomic batch)
Intents are ordered by priority within the same sim_time
Use this for "speak + move" or other simultaneous actions
- async submit_intents_batch(intents, sim_time=None, processing_mode=None, timeout=None)[ソース]¶
Submit multiple intents using concurrent processing capabilities.
This method provides enhanced batch processing with configurable concurrency modes for improved performance in multi-agent scenarios.
- 戻り値の型:
- パラメータ:
sim_time (float | None)
processing_mode (ProcessingMode | None)
timeout (float | None)
- Args:
intents: List of intents to process sim_time: Optional explicit sim_time for all intents processing_mode: Processing mode (sequential, concurrent, deterministic_concurrent) timeout: Timeout for concurrent operations
- Returns:
BatchResult containing effects, errors, and processing metadata
- Raises:
ValueError: If intents list is empty or contains invalid intents asyncio.TimeoutError: If processing exceeds timeout
- issue_cancel_token(agent_id, req_id, context_digest=None)[ソース]¶
Issue cancellation token for generation tracking.
Creates a new cancellation token with tuple key tracking for the given agent and request. The token can be used to cancel long-running operations like LLM generation when context becomes stale.
- 戻り値の型:
- パラメータ:
- Args:
agent_id: Agent identifier req_id: Request identifier context_digest: Optional context digest for staleness detection
- Returns:
CancelToken for tracking cancellation
- Raises:
ValueError: If agent_id or req_id is empty
Requirements addressed: - 4.1: Issue cancel_token with current context_digest
- async cancel_if_stale(agent_id, req_id, new_view_seq)[ソース]¶
Check staleness and cancel if needed with debounce logic.
Evaluates whether the context has become stale based on the staleness threshold and applies debounce logic to prevent rapid successive interruptions.
- Args:
agent_id: Agent identifier req_id: Request identifier new_view_seq: New view sequence number to check against
- Returns:
True if cancellation occurred
Requirements addressed: - 4.2: Evaluate staleness using latest_view_seq > context_seq + staleness_threshold - 4.3: Cancel current generation when context becomes stale - 4.7: Suppress rapid successive interruptions within debounce window
- set_agent_interrupt_policy(agent_id, policy)[ソース]¶
Set interrupt policy for an agent.
- Args:
agent_id: Agent identifier policy: Interrupt policy ("always" or "only_conflict")
- Raises:
ValueError: If policy is not valid
Requirements addressed: - 4.5: interrupt_on_new_info policy "always" triggers on any new information - 4.6: interrupt_on_new_info policy "only_conflict" triggers only on conflicts
- get_agent_interrupt_policy(agent_id)[ソース]¶
Get interrupt policy for an agent.
- Args:
agent_id: Agent identifier
- Returns:
Interrupt policy ("always" or "only_conflict"), defaults to "always"
- async check_and_cancel_stale_tokens(effect)[ソース]¶
Check all active tokens for staleness and cancel if needed.
This method is called during observation distribution to automatically cancel tokens when new events make their context stale.
- Args:
effect: New effect that might make contexts stale
- Returns:
List of request IDs that were cancelled
Requirements addressed: - 4.2: Evaluate staleness when new events occur during generation - 4.3: Cancel current generation when context becomes stale - 4.4: Provide cancellation reason to the agent
- cleanup_cancelled_tokens()[ソース]¶
Clean up cancelled tokens to prevent memory leaks.
- 戻り値の型:
- Returns:
Number of tokens cleaned up
- get_active_cancel_tokens()[ソース]¶
Get active cancel tokens grouped by agent.
- Returns:
Dictionary mapping agent_id to list of active req_ids
- get_agent_count()[ソース]¶
Get number of registered agents.
- 戻り値の型:
- Returns:
Number of currently registered agents
- get_latest_seq()[ソース]¶
Get latest global sequence number.
- 戻り値の型:
- Returns:
Latest global sequence number
- get_processing_stats()[ソース]¶
Get intent processing statistics.
- Returns:
Dictionary with processing statistics
- get_memory_stats()[ソース]¶
Get comprehensive memory management statistics.
- Returns:
Dictionary with detailed memory statistics including log entries, snapshots, view cache, and compaction information.
- async force_compaction()[ソース]¶
Force log compaction regardless of thresholds.
- 戻り値の型:
- Returns:
Number of entries removed during compaction
- async create_snapshot()[ソース]¶
Force creation of a WorldState snapshot.
- 戻り値の型:
- Returns:
Created snapshot
gunn.core.test_effect_validator module¶
Module contents¶
- class gunn.core.AgentHandle(agent_id, orchestrator)[ソース]¶
ベースクラス:
object
Per-agent interface for observation and intent submission.
Provides isolated interface for each agent with view sequence tracking and non-blocking operations.
- パラメータ:
agent_id (str)
orchestrator (Orchestrator)
- get_view_seq()[ソース]¶
Get current view sequence number.
- 戻り値の型:
- Returns:
Current view sequence number for this agent
- class gunn.core.DefaultEffectValidator(max_intents_per_minute=60, max_tokens_per_minute=10000, default_cooldown_seconds=1.0, max_payload_size_bytes=10000, allowed_intent_kinds=None)[ソース]¶
ベースクラス:
object
Default implementation of EffectValidator with comprehensive validation.
Provides validation for quota limits, cooldowns, permissions, and world state constraints to ensure intents can be safely executed.
Requirements addressed: - 1.5: Intent validation through EffectValidator before creating an Effect - 3.4: Validation for quota limits, cooldowns, and permissions - 10.3: Structured error codes for validation failures
- パラメータ:
- add_agent_permission(agent_id, permission)[ソース]¶
Add a permission for an agent.
- Args:
agent_id: Agent identifier permission: Permission string to add
- get_agent_quota_status(agent_id)[ソース]¶
Get current quota status for an agent.
- Args:
agent_id: Agent identifier
- Returns:
Dictionary with quota status information
- remove_agent_permission(agent_id, permission)[ソース]¶
Remove a permission for an agent.
- Args:
agent_id: Agent identifier permission: Permission string to remove
- set_agent_permissions(agent_id, permissions)[ソース]¶
Set permissions for an agent.
- Args:
agent_id: Agent identifier permissions: Set of permission strings
- set_intent_kind_cooldown(intent_kind, cooldown_seconds)[ソース]¶
Set cooldown for a specific intent kind.
- Args:
intent_kind: Intent kind to set cooldown for cooldown_seconds: Cooldown duration in seconds
- validate_intent(intent, world_state)[ソース]¶
Validate if intent can be executed.
Performs comprehensive validation including: - Basic structure validation - Permission checks - Quota limit validation - Cooldown enforcement - World state constraint checks - Payload size limits
- 戻り値の型:
- パラメータ:
intent (Intent)
world_state (WorldState)
- Args:
intent: Intent to validate world_state: Current world state
- Returns:
True if intent is valid and can be executed
- Raises:
ValidationError: If validation fails with detailed reasons
- class gunn.core.EffectValidator(*args, **kwargs)[ソース]¶
ベースクラス:
Protocol
Protocol for validating intents before creating effects.
- set_agent_permissions(agent_id, permissions)[ソース]¶
Set permissions for an agent.
- Args:
agent_id: Agent identifier permissions: Set of permission strings
- validate_intent(intent, world_state)[ソース]¶
Validate if intent can be executed.
- 戻り値の型:
- パラメータ:
intent (Intent)
world_state (WorldState)
- Args:
intent: Intent to validate world_state: Current world state
- Returns:
True if intent is valid and can be executed
- class gunn.core.EventLog(world_id='default')[ソース]¶
ベースクラス:
object
Append-only event log with hash chain integrity.
Provides thread-safe append operations, replay capabilities, and integrity verification through hash chaining.
Requirements addressed: - 1.2: Record events in global sequential log with global_seq identifier - 7.1: Maintain complete sequential log with global_seq numbers - 7.3: Provide replay capabilities from event log - 7.5: Validate state consistency using log hash/CRC and sequence gap detection
- パラメータ:
world_id (str)
- async append(effect, req_id=None)[ソース]¶
Append effect to log with hash chain checksum.
Thread-safe operation that adds a new effect to the log with proper sequencing and integrity checking.
- Args:
effect: Effect to append to the log req_id: Optional request ID for idempotency tracking
- Returns:
The global_seq assigned to this entry
- Raises:
ValueError: If effect is missing required fields
- async compact(keep_entries=1000)[ソース]¶
Compact the log by removing old entries.
Keeps the most recent entries and removes older ones to manage memory. This operation maintains integrity by preserving the hash chain.
- Args:
keep_entries: Number of recent entries to keep
- Returns:
Number of entries removed
- find_entries_by_req_id(req_id)[ソース]¶
Find entries by request ID.
- 戻り値の型:
- パラメータ:
req_id (str)
- Args:
req_id: Request ID to search for
- Returns:
List of entries with matching request ID
- find_entry_by_uuid(effect_uuid)[ソース]¶
Find entry by effect UUID.
- 戻り値の型:
- パラメータ:
effect_uuid (str)
- Args:
effect_uuid: UUID of the effect to find
- Returns:
EventLogEntry if found, None otherwise
- get_entries_by_source(source_id)[ソース]¶
Get all entries from a specific source.
- 戻り値の型:
- パラメータ:
source_id (str)
- Args:
source_id: Source identifier to filter by
- Returns:
List of entries from the specified source
- get_entries_in_time_range(start_time=None, end_time=None, use_sim_time=True)[ソース]¶
Get entries within a time range.
- 戻り値の型:
- パラメータ:
- Args:
start_time: Start time (inclusive), None for no lower bound end_time: End time (inclusive), None for no upper bound use_sim_time: If True, filter by sim_time; if False, use wall_time
- Returns:
List of entries within the specified time range
- get_entries_since(since_seq)[ソース]¶
Get entries for replay and catch-up.
Returns all entries with global_seq > since_seq in order.
- 戻り値の型:
- パラメータ:
since_seq (int)
- Args:
since_seq: Sequence number to start from (exclusive)
- Returns:
List of entries after the specified sequence number
- get_entry_count()[ソース]¶
Get total number of entries in the log.
- 戻り値の型:
- Returns:
Number of entries in the log
- get_latest_seq()[ソース]¶
Get the latest sequence number in the log.
- 戻り値の型:
- Returns:
Latest global_seq, or 0 if log is empty
- get_stats()[ソース]¶
Get log statistics.
- Returns:
Dictionary with log statistics including entry counts, time ranges, and integrity status
- validate_integrity()[ソース]¶
Validate complete log integrity.
Performs comprehensive integrity checks including: - Hash chain validation - Sequence gap detection - Corruption analysis
- Returns:
Dictionary with integrity report containing: - valid: Overall validity boolean - corrupted_entries: List of corrupted entry indices - missing_sequences: List of detected sequence gaps - total_entries: Total number of entries checked - details: Additional diagnostic information
- class gunn.core.EventLogEntry(**data)[ソース]¶
ベースクラス:
BaseModel
Single entry in the event log with integrity checking.
Each entry contains an effect along with metadata for ordering, timing, and hash chain integrity verification.
- パラメータ:
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gunn.core.Orchestrator(config, world_id='default', effect_validator=None)[ソース]¶
ベースクラス:
object
Central coordinator for multi-agent simulation.
Coordinates all system operations including event ingestion and ordering, view generation and delta distribution, intent validation and effect creation, and cancellation token management.
Requirements addressed: - 1.1: Create WorldState as single source of truth - 1.3: Generate View based on agent's observation policy - 1.4: Process events using deterministic ordering - 9.1: Deterministic ordering using (sim_time, priority, source_id, uuid) tuple - 9.3: Fixed tie-breaker: priority > source > UUID lexicographic
- パラメータ:
config (OrchestratorConfig)
world_id (str)
effect_validator (EffectValidator | None)
- async broadcast_event(draft)[ソース]¶
Create complete effect from draft and distribute observations.
Converts EffectDraft to complete Effect with priority completion, updates world state, generates observation deltas for affected agents, and delivers observations using timed queues with latency models.
- 戻り値の型:
- パラメータ:
draft (EffectDraft)
- Args:
draft: Effect draft to process and broadcast
- Raises:
ValueError: If draft is missing required fields
Requirements addressed: - 2.2: Generate ObservationDelta patches for affected agents - 2.5: Incremental updates via ObservationDelta patches - 6.4: ObservationDelta delivery latency ≤ 20ms - 6.5: Timed delivery using per-agent TimedQueues with latency models
- async cancel_if_stale(agent_id, req_id, new_view_seq)[ソース]¶
Check staleness and cancel if needed with debounce logic.
Evaluates whether the context has become stale based on the staleness threshold and applies debounce logic to prevent rapid successive interruptions.
- Args:
agent_id: Agent identifier req_id: Request identifier new_view_seq: New view sequence number to check against
- Returns:
True if cancellation occurred
Requirements addressed: - 4.2: Evaluate staleness using latest_view_seq > context_seq + staleness_threshold - 4.3: Cancel current generation when context becomes stale - 4.7: Suppress rapid successive interruptions within debounce window
- async check_and_cancel_stale_tokens(effect)[ソース]¶
Check all active tokens for staleness and cancel if needed.
This method is called during observation distribution to automatically cancel tokens when new events make their context stale.
- Args:
effect: New effect that might make contexts stale
- Returns:
List of request IDs that were cancelled
Requirements addressed: - 4.2: Evaluate staleness when new events occur during generation - 4.3: Cancel current generation when context becomes stale - 4.4: Provide cancellation reason to the agent
- cleanup_cancelled_tokens()[ソース]¶
Clean up cancelled tokens to prevent memory leaks.
- 戻り値の型:
- Returns:
Number of tokens cleaned up
- async create_snapshot()[ソース]¶
Force creation of a WorldState snapshot.
- 戻り値の型:
- Returns:
Created snapshot
- evict_old_views(max_age_seconds=3600.0)[ソース]¶
Evict old cached views to free memory.
- Args:
max_age_seconds: Maximum age for cached views
- Returns:
Number of views evicted
- async force_compaction()[ソース]¶
Force log compaction regardless of thresholds.
- 戻り値の型:
- Returns:
Number of entries removed during compaction
- get_active_cancel_tokens()[ソース]¶
Get active cancel tokens grouped by agent.
- Returns:
Dictionary mapping agent_id to list of active req_ids
- get_agent_count()[ソース]¶
Get number of registered agents.
- 戻り値の型:
- Returns:
Number of currently registered agents
- get_agent_interrupt_policy(agent_id)[ソース]¶
Get interrupt policy for an agent.
- Args:
agent_id: Agent identifier
- Returns:
Interrupt policy ("always" or "only_conflict"), defaults to "always"
- get_latest_seq()[ソース]¶
Get latest global sequence number.
- 戻り値の型:
- Returns:
Latest global sequence number
- get_memory_stats()[ソース]¶
Get comprehensive memory management statistics.
- Returns:
Dictionary with detailed memory statistics including log entries, snapshots, view cache, and compaction information.
- get_processing_stats()[ソース]¶
Get intent processing statistics.
- Returns:
Dictionary with processing statistics
- get_registered_effect_kinds()[ソース]¶
Get list of all registered custom effect kinds.
- Returns:
List of effect kind strings that have registered handlers
- issue_cancel_token(agent_id, req_id, context_digest=None)[ソース]¶
Issue cancellation token for generation tracking.
Creates a new cancellation token with tuple key tracking for the given agent and request. The token can be used to cancel long-running operations like LLM generation when context becomes stale.
- 戻り値の型:
- パラメータ:
- Args:
agent_id: Agent identifier req_id: Request identifier context_digest: Optional context digest for staleness detection
- Returns:
CancelToken for tracking cancellation
- Raises:
ValueError: If agent_id or req_id is empty
Requirements addressed: - 4.1: Issue cancel_token with current context_digest
- async register_agent(agent_id, policy, permissions=None)[ソース]¶
Register a new agent with observation policy and permissions.
- 戻り値の型:
- パラメータ:
agent_id (str)
policy (ObservationPolicy)
- Args:
agent_id: Unique identifier for the agent policy: Observation policy for this agent permissions: Set of permissions for the agent. If None, uses default permissions.
- Returns:
AgentHandle for the registered agent
- Raises:
ValueError: If agent_id is empty or already registered RuntimeError: If maximum agents exceeded
- register_effect_handler(effect_kind, handler)[ソース]¶
Register a custom effect handler for a specific effect kind.
This enables domain-specific effect types beyond the built-in kinds (Move, Speak, Interact, EntityCreated, EntityRemoved, MessageEmitted).
- Args:
effect_kind: The effect kind string to handle (e.g., "Attack", "Heal") handler: Async function that takes (effect, world_state) and applies the effect
- Example:
```python async def handle_attack(effect: Effect, world_state: WorldState) -> None:
attacker_id = effect["source_id"] target_id = effect["payload"]["target_id"] damage = effect["payload"]["damage"] # Apply damage logic to world_state ...
orchestrator.register_effect_handler("Attack", handle_attack) ```
- Notes:
Handlers are called after built-in effect kinds are checked
Handlers should update world_state in-place
Handlers should raise exceptions on failure (will be logged)
Multiple handlers can be registered for different effect kinds
- set_agent_backpressure_policy(agent_id, policy_name)[ソース]¶
Set backpressure policy for a specific agent.
- Args:
agent_id: Agent identifier policy_name: Backpressure policy name (defer, shed_oldest, drop_newest)
- Raises:
ValueError: If agent_id is not registered or policy_name is invalid
Requirements addressed: - 10.2: Configurable backpressure policies per agent class
- set_agent_interrupt_policy(agent_id, policy)[ソース]¶
Set interrupt policy for an agent.
- Args:
agent_id: Agent identifier policy: Interrupt policy ("always" or "only_conflict")
- Raises:
ValueError: If policy is not valid
Requirements addressed: - 4.5: interrupt_on_new_info policy "always" triggers on any new information - 4.6: interrupt_on_new_info policy "only_conflict" triggers only on conflicts
- set_sim_time_authority(authority)[ソース]¶
Set which adapter controls sim_time.
- Args:
authority: Authority for sim_time ("unity", "unreal", or "none")
- async submit_intent(intent)[ソース]¶
Two-phase commit: idempotency → quota → backpressure → priority → fairness → validator → commit.
- Args:
intent: Intent to process
- Returns:
Request ID for tracking
- Raises:
ValueError: If intent is invalid StaleContextError: If intent context is stale QuotaExceededError: If agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If intent validation fails
- async submit_intents(intents, sim_time=None)[ソース]¶
Submit multiple intents for simultaneous processing at the same sim_time.
This method enables true simultaneous intent execution within a single tick, maintaining deterministic ordering through (sim_time, priority, source_id, uuid).
- Args:
intents: List of intents to process simultaneously sim_time: Optional explicit sim_time for all intents (default: current time)
- Returns:
List of request IDs for tracking
- Raises:
ValueError: If intents list is empty or contains invalid intents StaleContextError: If any intent context is stale QuotaExceededError: If any agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If any intent validation fails
- Notes:
All intents are validated before any are enqueued
If any validation fails, none are processed (atomic batch)
Intents are ordered by priority within the same sim_time
Use this for "speak + move" or other simultaneous actions
- async submit_intents_batch(intents, sim_time=None, processing_mode=None, timeout=None)[ソース]¶
Submit multiple intents using concurrent processing capabilities.
This method provides enhanced batch processing with configurable concurrency modes for improved performance in multi-agent scenarios.
- 戻り値の型:
- パラメータ:
sim_time (float | None)
processing_mode (ProcessingMode | None)
timeout (float | None)
- Args:
intents: List of intents to process sim_time: Optional explicit sim_time for all intents processing_mode: Processing mode (sequential, concurrent, deterministic_concurrent) timeout: Timeout for concurrent operations
- Returns:
BatchResult containing effects, errors, and processing metadata
- Raises:
ValueError: If intents list is empty or contains invalid intents asyncio.TimeoutError: If processing exceeds timeout
- class gunn.core.OrchestratorConfig(max_agents=100, staleness_threshold=0, debounce_ms=100.0, deadline_ms=5000.0, token_budget=1000, backpressure_policy='defer', default_priority=0, dedup_ttl_minutes=60, max_dedup_entries=10000, dedup_cleanup_interval_minutes=10, dedup_warmup_minutes=5, max_queue_depth=100, quota_intents_per_minute=60, quota_tokens_per_minute=10000, use_in_memory_dedup=False, processing_idle_shutdown_ms=250.0, max_log_entries=10000, view_cache_size=1000, compaction_threshold=5000, snapshot_interval=1000, max_snapshots=10, memory_check_interval_seconds=60.0, auto_compaction_enabled=True, concurrent_processing_config=None)[ソース]¶
ベースクラス:
object
Configuration for the Orchestrator.
- パラメータ:
max_agents (int)
staleness_threshold (int)
debounce_ms (float)
deadline_ms (float)
token_budget (int)
backpressure_policy (str)
default_priority (int)
dedup_ttl_minutes (int)
max_dedup_entries (int)
dedup_cleanup_interval_minutes (int)
dedup_warmup_minutes (int)
max_queue_depth (int)
quota_intents_per_minute (int)
quota_tokens_per_minute (int)
use_in_memory_dedup (bool)
processing_idle_shutdown_ms (float)
max_log_entries (int)
view_cache_size (int)
compaction_threshold (int)
snapshot_interval (int)
max_snapshots (int)
memory_check_interval_seconds (float)
auto_compaction_enabled (bool)
concurrent_processing_config (ConcurrentProcessingConfig | None)