gunn package¶
Subpackages¶
- gunn.adapters package
- gunn.cli package
- gunn.config package
- Submodules
- gunn.config.config module
ConfigError
FeatureFlags
FeatureFlags.latency_simulation
FeatureFlags.backpressure_management
FeatureFlags.staleness_detection
FeatureFlags.cancellation_tokens
FeatureFlags.telemetry
FeatureFlags.metrics_export
FeatureFlags.structured_logging
FeatureFlags.pii_redaction
FeatureFlags.memory_management
FeatureFlags.log_compaction
FeatureFlags.view_caching
FeatureFlags.authentication
FeatureFlags.authorization
FeatureFlags.rate_limiting
FeatureFlags.distributed_mode
FeatureFlags.gpu_acceleration
FeatureFlags.from_env()
FeatureFlags.to_dict()
LoggingConfig
MetricsConfig
SecurityConfig
DatabaseConfig
Config
load_config_from_file()
load_config_from_env()
load_config()
validate_config()
- gunn.config.deployment module
HealthCheckConfig
HealthCheckConfig.enabled
HealthCheckConfig.endpoint
HealthCheckConfig.readiness_endpoint
HealthCheckConfig.liveness_endpoint
HealthCheckConfig.timeout_seconds
HealthCheckConfig.check_interval_seconds
HealthCheckConfig.check_database
HealthCheckConfig.check_event_log
HealthCheckConfig.check_orchestrator
HealthCheckConfig.check_memory_usage
HealthCheckConfig.max_memory_usage_percent
HealthCheckConfig.max_response_time_ms
HealthCheckConfig.max_queue_depth
HealthCheckConfig.model_config
HealthStatus
DeploymentConfig
DeploymentConfig.service_name
DeploymentConfig.service_version
DeploymentConfig.instance_id
DeploymentConfig.host
DeploymentConfig.port
DeploymentConfig.workers
DeploymentConfig.health
DeploymentConfig.shutdown_timeout_seconds
DeploymentConfig.max_memory_mb
DeploymentConfig.max_cpu_percent
DeploymentConfig.enable_profiling
DeploymentConfig.profiling_interval_seconds
DeploymentConfig.model_config
HealthChecker
GracefulShutdownHandler
- gunn.config.environment module
- Module contents
Config
ConfigError
DeploymentConfig
DeploymentConfig.model_config
DeploymentConfig.service_name
DeploymentConfig.service_version
DeploymentConfig.instance_id
DeploymentConfig.host
DeploymentConfig.port
DeploymentConfig.workers
DeploymentConfig.health
DeploymentConfig.shutdown_timeout_seconds
DeploymentConfig.max_memory_mb
DeploymentConfig.max_cpu_percent
DeploymentConfig.enable_profiling
DeploymentConfig.profiling_interval_seconds
Environment
FeatureFlags
FeatureFlags.authentication
FeatureFlags.authorization
FeatureFlags.backpressure_management
FeatureFlags.cancellation_tokens
FeatureFlags.distributed_mode
FeatureFlags.from_env()
FeatureFlags.gpu_acceleration
FeatureFlags.latency_simulation
FeatureFlags.log_compaction
FeatureFlags.memory_management
FeatureFlags.metrics_export
FeatureFlags.pii_redaction
FeatureFlags.rate_limiting
FeatureFlags.staleness_detection
FeatureFlags.structured_logging
FeatureFlags.telemetry
FeatureFlags.to_dict()
FeatureFlags.view_caching
HealthCheckConfig
HealthCheckConfig.model_config
HealthCheckConfig.enabled
HealthCheckConfig.endpoint
HealthCheckConfig.readiness_endpoint
HealthCheckConfig.liveness_endpoint
HealthCheckConfig.timeout_seconds
HealthCheckConfig.check_interval_seconds
HealthCheckConfig.check_database
HealthCheckConfig.check_event_log
HealthCheckConfig.check_orchestrator
HealthCheckConfig.check_memory_usage
HealthCheckConfig.max_memory_usage_percent
HealthCheckConfig.max_response_time_ms
HealthCheckConfig.max_queue_depth
get_environment()
load_config()
load_config_from_env()
load_config_from_file()
validate_config()
- gunn.core package
- Submodules
- gunn.core.event_log module
EventLogEntry
EventLog
EventLog.append()
EventLog.get_entries_since()
EventLog.get_all_entries()
EventLog.get_latest_seq()
EventLog.get_entry_count()
EventLog.validate_integrity()
EventLog.find_entry_by_uuid()
EventLog.find_entries_by_req_id()
EventLog.get_entries_by_source()
EventLog.get_entries_in_time_range()
EventLog.compact()
EventLog.get_stats()
- gunn.core.orchestrator module
EffectValidator
DefaultEffectValidator
AgentHandle
OrchestratorConfig
Orchestrator
Orchestrator.initialize()
Orchestrator.set_sim_time_authority()
Orchestrator.register_agent()
Orchestrator.register_effect_handler()
Orchestrator.unregister_effect_handler()
Orchestrator.get_registered_effect_kinds()
Orchestrator.set_agent_backpressure_policy()
Orchestrator.broadcast_event()
Orchestrator.submit_intent()
Orchestrator.submit_intents()
Orchestrator.submit_intents_batch()
Orchestrator.issue_cancel_token()
Orchestrator.cancel_if_stale()
Orchestrator.set_agent_interrupt_policy()
Orchestrator.get_agent_interrupt_policy()
Orchestrator.check_and_cancel_stale_tokens()
Orchestrator.cleanup_cancelled_tokens()
Orchestrator.get_active_cancel_tokens()
Orchestrator.get_agent_count()
Orchestrator.get_latest_seq()
Orchestrator.get_world_state()
Orchestrator.get_processing_stats()
Orchestrator.get_memory_stats()
Orchestrator.force_compaction()
Orchestrator.create_snapshot()
Orchestrator.evict_old_views()
Orchestrator.shutdown()
- gunn.core.test_effect_validator module
- Module contents
AgentHandle
DefaultEffectValidator
EffectValidator
EventLog
EventLog.append()
EventLog.compact()
EventLog.find_entries_by_req_id()
EventLog.find_entry_by_uuid()
EventLog.get_all_entries()
EventLog.get_entries_by_source()
EventLog.get_entries_in_time_range()
EventLog.get_entries_since()
EventLog.get_entry_count()
EventLog.get_latest_seq()
EventLog.get_stats()
EventLog.validate_integrity()
EventLogEntry
Orchestrator
Orchestrator.broadcast_event()
Orchestrator.cancel_if_stale()
Orchestrator.check_and_cancel_stale_tokens()
Orchestrator.cleanup_cancelled_tokens()
Orchestrator.create_snapshot()
Orchestrator.evict_old_views()
Orchestrator.force_compaction()
Orchestrator.get_active_cancel_tokens()
Orchestrator.get_agent_count()
Orchestrator.get_agent_interrupt_policy()
Orchestrator.get_latest_seq()
Orchestrator.get_memory_stats()
Orchestrator.get_processing_stats()
Orchestrator.get_registered_effect_kinds()
Orchestrator.get_world_state()
Orchestrator.initialize()
Orchestrator.issue_cancel_token()
Orchestrator.register_agent()
Orchestrator.register_effect_handler()
Orchestrator.set_agent_backpressure_policy()
Orchestrator.set_agent_interrupt_policy()
Orchestrator.set_sim_time_authority()
Orchestrator.shutdown()
Orchestrator.submit_intent()
Orchestrator.submit_intents()
Orchestrator.submit_intents_batch()
Orchestrator.unregister_effect_handler()
OrchestratorConfig
- gunn.facades package
- Submodules
- gunn.facades.message module
MessageSubscription
MessageFacade
MessageFacade.initialize()
MessageFacade.register_agent()
MessageFacade.emit()
MessageFacade.subscribe()
MessageFacade.unsubscribe()
MessageFacade.get_messages()
MessageFacade.wait_for_message()
MessageFacade.shutdown()
MessageFacade.get_orchestrator()
MessageFacade.set_timeout()
MessageFacade.get_agent_subscriptions()
MessageFacade.get_message_queue_size()
- gunn.facades.rl module
- Module contents
MessageFacade
MessageFacade.emit()
MessageFacade.get_agent_subscriptions()
MessageFacade.get_message_queue_size()
MessageFacade.get_messages()
MessageFacade.get_orchestrator()
MessageFacade.initialize()
MessageFacade.register_agent()
MessageFacade.set_timeout()
MessageFacade.shutdown()
MessageFacade.subscribe()
MessageFacade.unsubscribe()
MessageFacade.wait_for_message()
RLFacade
- gunn.policies package
- Submodules
- gunn.policies.observation module
- Module contents
- gunn.schemas package
- gunn.storage package
- gunn.utils package
- Submodules
- gunn.utils.backpressure module
- gunn.utils.errors module
RecoveryAction
SimulationError
StaleContextError
IntentConflictError
QuotaExceededError
TimeoutError
BackpressureError
ValidationError
CircuitBreakerOpenError
CircuitBreaker
ErrorRecoveryPolicy
ErrorRecoveryPolicy.handle_stale_context()
ErrorRecoveryPolicy.handle_intent_conflict()
ErrorRecoveryPolicy.handle_quota_exceeded()
ErrorRecoveryPolicy.handle_timeout()
ErrorRecoveryPolicy.handle_backpressure()
ErrorRecoveryPolicy.handle_validation()
ErrorRecoveryPolicy.handle_circuit_breaker()
ErrorRecoveryPolicy.calculate_retry_delay()
ErrorRecoveryPolicy.should_retry()
LLMGenerationError
LLMTimeoutError
- gunn.utils.hashing module
- gunn.utils.hashing_demo module
- gunn.utils.memory module
- gunn.utils.metrics_exporter module
- gunn.utils.scheduling module
IntentScheduler
WeightedRoundRobinScheduler
WeightedRoundRobinScheduler.set_agent_weight()
WeightedRoundRobinScheduler.enqueue()
WeightedRoundRobinScheduler.dequeue()
WeightedRoundRobinScheduler.get_queue_depth()
WeightedRoundRobinScheduler.get_agent_queue_depths()
WeightedRoundRobinScheduler.get_priority_distribution()
WeightedRoundRobinScheduler.get_stats()
WeightedRoundRobinScheduler.remove_agent()
WeightedRoundRobinScheduler.clear()
PriorityScheduler
- gunn.utils.telemetry module
redact_pii()
pii_redaction_processor()
setup_logging()
setup_tracing()
get_tracer()
get_logger()
log_operation()
PerformanceTimer
async_performance_timer()
record_queue_depth()
record_cancellation()
record_queue_high_watermark()
record_backpressure_event()
record_circuit_breaker_state_change()
record_error_recovery_action()
record_intent_throughput()
record_conflict()
record_observation_delivery_latency()
update_global_seq()
update_view_seq()
update_active_agents_count()
start_metrics_server()
MonotonicClock
get_timing_context()
SystemMonitor
BandwidthMonitor
- gunn.utils.timing module
- gunn.utils.timing_demo module
- Module contents
BackpressureError
BackpressureManager
BackpressureQueue
CircuitBreaker
CircuitBreakerOpenError
DeferPolicy
DropNewestPolicy
ErrorRecoveryPolicy
ErrorRecoveryPolicy.calculate_retry_delay()
ErrorRecoveryPolicy.handle_backpressure()
ErrorRecoveryPolicy.handle_circuit_breaker()
ErrorRecoveryPolicy.handle_intent_conflict()
ErrorRecoveryPolicy.handle_quota_exceeded()
ErrorRecoveryPolicy.handle_stale_context()
ErrorRecoveryPolicy.handle_timeout()
ErrorRecoveryPolicy.handle_validation()
ErrorRecoveryPolicy.should_retry()
IntentConflictError
MemoryConfig
MemoryManager
MemoryStats
PriorityScheduler
QuotaExceededError
RecoveryAction
ShedOldestPolicy
SimulationError
SnapshotManager
StaleContextError
TimedQueue
TimeoutError
ValidationError
ViewCache
WeightedRoundRobinScheduler
WeightedRoundRobinScheduler.clear()
WeightedRoundRobinScheduler.dequeue()
WeightedRoundRobinScheduler.enqueue()
WeightedRoundRobinScheduler.get_agent_queue_depths()
WeightedRoundRobinScheduler.get_priority_distribution()
WeightedRoundRobinScheduler.get_queue_depth()
WeightedRoundRobinScheduler.get_stats()
WeightedRoundRobinScheduler.remove_agent()
WeightedRoundRobinScheduler.set_agent_weight()
WorldStateSnapshot
canonical_json()
chain_checksum()
detect_corruption()
validate_hash_chain()
verify_sequence_integrity()
Module contents¶
gunn - Multi-agent simulation core.
gunn (群) provides a controlled interface for agent-environment interaction, supporting both single and multi-agent settings with partial observation, concurrent execution, and intelligent interruption capabilities.
- class gunn.AgentHandle(agent_id, orchestrator)[ソース]¶
ベースクラス:
object
Per-agent interface for observation and intent submission.
Provides isolated interface for each agent with view sequence tracking and non-blocking operations.
- パラメータ:
agent_id (str)
orchestrator (Orchestrator)
- get_view_seq()[ソース]¶
Get current view sequence number.
- 戻り値の型:
- Returns:
Current view sequence number for this agent
- class gunn.BatchResult(**data)[ソース]¶
ベースクラス:
BaseModel
Result of batch intent processing.
- パラメータ:
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gunn.ConcurrentIntentProcessor(config)[ソース]¶
ベースクラス:
object
Handles concurrent intent processing logic.
- パラメータ:
config (ConcurrentProcessingConfig)
- get_processing_stats()[ソース]¶
Get statistics about concurrent processing performance.
- Returns:
Dictionary containing processing statistics
- get_recommended_mode(intent_count)[ソース]¶
Get recommended processing mode based on batch characteristics.
- 戻り値の型:
- パラメータ:
intent_count (int)
- Args:
intent_count: Number of intents in the batch
- Returns:
Recommended processing mode
- async process_batch(intents, mode=None, timeout=None)[ソース]¶
Process batch of intents according to specified mode.
- 戻り値の型:
- パラメータ:
mode (ProcessingMode | None)
timeout (float | None)
- Args:
intents: List of intents to process mode: Processing mode (defaults to config default) timeout: Timeout for concurrent operations
- Returns:
BatchResult containing effects, errors, and metadata
- Raises:
ValueError: If no intent processor is set asyncio.TimeoutError: If processing exceeds timeout
- class gunn.ConcurrentProcessingConfig(**data)[ソース]¶
ベースクラス:
BaseModel
Configuration for concurrent intent processing.
- パラメータ:
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
processing_mode:
ProcessingMode
¶
- class gunn.DefaultEffectValidator(max_intents_per_minute=60, max_tokens_per_minute=10000, default_cooldown_seconds=1.0, max_payload_size_bytes=10000, allowed_intent_kinds=None)[ソース]¶
ベースクラス:
object
Default implementation of EffectValidator with comprehensive validation.
Provides validation for quota limits, cooldowns, permissions, and world state constraints to ensure intents can be safely executed.
Requirements addressed: - 1.5: Intent validation through EffectValidator before creating an Effect - 3.4: Validation for quota limits, cooldowns, and permissions - 10.3: Structured error codes for validation failures
- パラメータ:
- add_agent_permission(agent_id, permission)[ソース]¶
Add a permission for an agent.
- Args:
agent_id: Agent identifier permission: Permission string to add
- get_agent_quota_status(agent_id)[ソース]¶
Get current quota status for an agent.
- Args:
agent_id: Agent identifier
- Returns:
Dictionary with quota status information
- remove_agent_permission(agent_id, permission)[ソース]¶
Remove a permission for an agent.
- Args:
agent_id: Agent identifier permission: Permission string to remove
- set_agent_permissions(agent_id, permissions)[ソース]¶
Set permissions for an agent.
- Args:
agent_id: Agent identifier permissions: Set of permission strings
- set_intent_kind_cooldown(intent_kind, cooldown_seconds)[ソース]¶
Set cooldown for a specific intent kind.
- Args:
intent_kind: Intent kind to set cooldown for cooldown_seconds: Cooldown duration in seconds
- validate_intent(intent, world_state)[ソース]¶
Validate if intent can be executed.
Performs comprehensive validation including: - Basic structure validation - Permission checks - Quota limit validation - Cooldown enforcement - World state constraint checks - Payload size limits
- 戻り値の型:
- パラメータ:
intent (Intent)
world_state (WorldState)
- Args:
intent: Intent to validate world_state: Current world state
- Returns:
True if intent is valid and can be executed
- Raises:
ValidationError: If validation fails with detailed reasons
- class gunn.EffectValidator(*args, **kwargs)[ソース]¶
ベースクラス:
Protocol
Protocol for validating intents before creating effects.
- set_agent_permissions(agent_id, permissions)[ソース]¶
Set permissions for an agent.
- Args:
agent_id: Agent identifier permissions: Set of permission strings
- validate_intent(intent, world_state)[ソース]¶
Validate if intent can be executed.
- 戻り値の型:
- パラメータ:
intent (Intent)
world_state (WorldState)
- Args:
intent: Intent to validate world_state: Current world state
- Returns:
True if intent is valid and can be executed
- class gunn.EventLog(world_id='default')[ソース]¶
ベースクラス:
object
Append-only event log with hash chain integrity.
Provides thread-safe append operations, replay capabilities, and integrity verification through hash chaining.
Requirements addressed: - 1.2: Record events in global sequential log with global_seq identifier - 7.1: Maintain complete sequential log with global_seq numbers - 7.3: Provide replay capabilities from event log - 7.5: Validate state consistency using log hash/CRC and sequence gap detection
- パラメータ:
world_id (str)
- async append(effect, req_id=None)[ソース]¶
Append effect to log with hash chain checksum.
Thread-safe operation that adds a new effect to the log with proper sequencing and integrity checking.
- Args:
effect: Effect to append to the log req_id: Optional request ID for idempotency tracking
- Returns:
The global_seq assigned to this entry
- Raises:
ValueError: If effect is missing required fields
- async compact(keep_entries=1000)[ソース]¶
Compact the log by removing old entries.
Keeps the most recent entries and removes older ones to manage memory. This operation maintains integrity by preserving the hash chain.
- Args:
keep_entries: Number of recent entries to keep
- Returns:
Number of entries removed
- find_entries_by_req_id(req_id)[ソース]¶
Find entries by request ID.
- 戻り値の型:
- パラメータ:
req_id (str)
- Args:
req_id: Request ID to search for
- Returns:
List of entries with matching request ID
- find_entry_by_uuid(effect_uuid)[ソース]¶
Find entry by effect UUID.
- 戻り値の型:
- パラメータ:
effect_uuid (str)
- Args:
effect_uuid: UUID of the effect to find
- Returns:
EventLogEntry if found, None otherwise
- get_entries_by_source(source_id)[ソース]¶
Get all entries from a specific source.
- 戻り値の型:
- パラメータ:
source_id (str)
- Args:
source_id: Source identifier to filter by
- Returns:
List of entries from the specified source
- get_entries_in_time_range(start_time=None, end_time=None, use_sim_time=True)[ソース]¶
Get entries within a time range.
- 戻り値の型:
- パラメータ:
- Args:
start_time: Start time (inclusive), None for no lower bound end_time: End time (inclusive), None for no upper bound use_sim_time: If True, filter by sim_time; if False, use wall_time
- Returns:
List of entries within the specified time range
- get_entries_since(since_seq)[ソース]¶
Get entries for replay and catch-up.
Returns all entries with global_seq > since_seq in order.
- 戻り値の型:
- パラメータ:
since_seq (int)
- Args:
since_seq: Sequence number to start from (exclusive)
- Returns:
List of entries after the specified sequence number
- get_entry_count()[ソース]¶
Get total number of entries in the log.
- 戻り値の型:
- Returns:
Number of entries in the log
- get_latest_seq()[ソース]¶
Get the latest sequence number in the log.
- 戻り値の型:
- Returns:
Latest global_seq, or 0 if log is empty
- get_stats()[ソース]¶
Get log statistics.
- Returns:
Dictionary with log statistics including entry counts, time ranges, and integrity status
- validate_integrity()[ソース]¶
Validate complete log integrity.
Performs comprehensive integrity checks including: - Hash chain validation - Sequence gap detection - Corruption analysis
- Returns:
Dictionary with integrity report containing: - valid: Overall validity boolean - corrupted_entries: List of corrupted entry indices - missing_sequences: List of detected sequence gaps - total_entries: Total number of entries checked - details: Additional diagnostic information
- class gunn.EventLogEntry(**data)[ソース]¶
ベースクラス:
BaseModel
Single entry in the event log with integrity checking.
Each entry contains an effect along with metadata for ordering, timing, and hash chain integrity verification.
- パラメータ:
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gunn.MessageFacade(orchestrator=None, config=None, world_id='default', timeout_seconds=30.0)[ソース]¶
ベースクラス:
object
Message-oriented facade wrapping Orchestrator functionality.
Provides an event-driven interface with emit() for broadcasting and subscription-based message delivery while operating on the same underlying event system as other facades.
Requirements addressed: - 5.3: Developers call env.emit() to broadcast events through ObservationPolicy filtering - 5.4: Agents receive messages according to their observation policies - 5.5: Operates on the same underlying event system
- パラメータ:
orchestrator (Orchestrator | None)
config (OrchestratorConfig | None)
world_id (str)
timeout_seconds (float)
- async emit(message_type, payload, source_id, schema_version='1.0.0')[ソース]¶
Broadcast an event through observation policies.
This method implements requirement 5.3: developers call env.emit() to broadcast events through ObservationPolicy filtering.
- Args:
message_type: Type of message being emitted payload: Message payload data source_id: Identifier of the message source schema_version: Schema version for the message
- Raises:
ValueError: If message_type or source_id is invalid BackpressureError: If backpressure limits are exceeded ValidationError: If message validation fails TimeoutError: If emission times out
Requirements addressed: - 5.3: WHEN using message facade THEN developers SHALL call env.emit() to broadcast events through ObservationPolicy filtering
- get_agent_subscriptions(agent_id)[ソース]¶
Get all active subscriptions for an agent.
- 戻り値の型:
- パラメータ:
agent_id (str)
- Args:
agent_id: Agent identifier
- Returns:
List of active subscriptions
- Raises:
ValueError: If agent_id is not registered
- get_message_queue_size(agent_id)[ソース]¶
Get the current size of an agent's message queue.
- Args:
agent_id: Agent identifier
- Returns:
Number of pending messages
- Raises:
ValueError: If agent_id is not registered
- async get_messages(agent_id, timeout=None)[ソース]¶
Get pending messages for an agent.
- Args:
agent_id: Agent identifier timeout: Optional timeout for waiting for messages
- Returns:
List of pending messages
- Raises:
ValueError: If agent_id is not registered TimeoutError: If timeout is exceeded
- get_orchestrator()[ソース]¶
Get the underlying orchestrator instance.
- 戻り値の型:
- Returns:
The orchestrator instance used by this facade
Requirements addressed: - 5.5: Both facades operate on the same underlying event system
- async register_agent(agent_id, policy)[ソース]¶
Register an agent with the simulation.
- 戻り値の型:
- パラメータ:
agent_id (str)
policy (ObservationPolicy)
- Args:
agent_id: Unique identifier for the agent policy: Observation policy for this agent
- Raises:
ValueError: If agent_id is invalid or already registered RuntimeError: If maximum agents exceeded
- set_timeout(timeout_seconds)[ソース]¶
Set the default timeout for operations.
- Args:
timeout_seconds: New timeout value in seconds
- Raises:
ValueError: If timeout_seconds is not positive
- async shutdown()[ソース]¶
Shutdown the message facade and clean up resources.
Cancels all delivery tasks and shuts down the orchestrator.
- 戻り値の型:
- async subscribe(agent_id, message_types=None, handler=None)[ソース]¶
Subscribe an agent to specific message types.
This method implements requirement 5.4: agents receive messages according to their observation policies.
- 戻り値の型:
- パラメータ:
- Args:
agent_id: Agent identifier message_types: Set of message types to subscribe to (None = all types) handler: Optional message handler function
- Returns:
MessageSubscription object for managing the subscription
- Raises:
ValueError: If agent_id is not registered
Requirements addressed: - 5.4: WHEN using message facade THEN agents SHALL receive messages according to their observation policies
- async unsubscribe(subscription)[ソース]¶
Remove a message subscription.
- 戻り値の型:
- パラメータ:
subscription (MessageSubscription)
- Args:
subscription: Subscription to remove
- async wait_for_message(agent_id, message_type=None, timeout=None)[ソース]¶
Wait for a specific message for an agent.
- Args:
agent_id: Agent identifier message_type: Optional specific message type to wait for timeout: Optional timeout for waiting
- Returns:
The received message
- Raises:
ValueError: If agent_id is not registered TimeoutError: If timeout is exceeded
- class gunn.Orchestrator(config, world_id='default', effect_validator=None)[ソース]¶
ベースクラス:
object
Central coordinator for multi-agent simulation.
Coordinates all system operations including event ingestion and ordering, view generation and delta distribution, intent validation and effect creation, and cancellation token management.
Requirements addressed: - 1.1: Create WorldState as single source of truth - 1.3: Generate View based on agent's observation policy - 1.4: Process events using deterministic ordering - 9.1: Deterministic ordering using (sim_time, priority, source_id, uuid) tuple - 9.3: Fixed tie-breaker: priority > source > UUID lexicographic
- パラメータ:
config (OrchestratorConfig)
world_id (str)
effect_validator (EffectValidator | None)
- async broadcast_event(draft)[ソース]¶
Create complete effect from draft and distribute observations.
Converts EffectDraft to complete Effect with priority completion, updates world state, generates observation deltas for affected agents, and delivers observations using timed queues with latency models.
- 戻り値の型:
- パラメータ:
draft (EffectDraft)
- Args:
draft: Effect draft to process and broadcast
- Raises:
ValueError: If draft is missing required fields
Requirements addressed: - 2.2: Generate ObservationDelta patches for affected agents - 2.5: Incremental updates via ObservationDelta patches - 6.4: ObservationDelta delivery latency ≤ 20ms - 6.5: Timed delivery using per-agent TimedQueues with latency models
- async cancel_if_stale(agent_id, req_id, new_view_seq)[ソース]¶
Check staleness and cancel if needed with debounce logic.
Evaluates whether the context has become stale based on the staleness threshold and applies debounce logic to prevent rapid successive interruptions.
- Args:
agent_id: Agent identifier req_id: Request identifier new_view_seq: New view sequence number to check against
- Returns:
True if cancellation occurred
Requirements addressed: - 4.2: Evaluate staleness using latest_view_seq > context_seq + staleness_threshold - 4.3: Cancel current generation when context becomes stale - 4.7: Suppress rapid successive interruptions within debounce window
- async check_and_cancel_stale_tokens(effect)[ソース]¶
Check all active tokens for staleness and cancel if needed.
This method is called during observation distribution to automatically cancel tokens when new events make their context stale.
- Args:
effect: New effect that might make contexts stale
- Returns:
List of request IDs that were cancelled
Requirements addressed: - 4.2: Evaluate staleness when new events occur during generation - 4.3: Cancel current generation when context becomes stale - 4.4: Provide cancellation reason to the agent
- cleanup_cancelled_tokens()[ソース]¶
Clean up cancelled tokens to prevent memory leaks.
- 戻り値の型:
- Returns:
Number of tokens cleaned up
- async create_snapshot()[ソース]¶
Force creation of a WorldState snapshot.
- 戻り値の型:
- Returns:
Created snapshot
- evict_old_views(max_age_seconds=3600.0)[ソース]¶
Evict old cached views to free memory.
- Args:
max_age_seconds: Maximum age for cached views
- Returns:
Number of views evicted
- async force_compaction()[ソース]¶
Force log compaction regardless of thresholds.
- 戻り値の型:
- Returns:
Number of entries removed during compaction
- get_active_cancel_tokens()[ソース]¶
Get active cancel tokens grouped by agent.
- Returns:
Dictionary mapping agent_id to list of active req_ids
- get_agent_count()[ソース]¶
Get number of registered agents.
- 戻り値の型:
- Returns:
Number of currently registered agents
- get_agent_interrupt_policy(agent_id)[ソース]¶
Get interrupt policy for an agent.
- Args:
agent_id: Agent identifier
- Returns:
Interrupt policy ("always" or "only_conflict"), defaults to "always"
- get_latest_seq()[ソース]¶
Get latest global sequence number.
- 戻り値の型:
- Returns:
Latest global sequence number
- get_memory_stats()[ソース]¶
Get comprehensive memory management statistics.
- Returns:
Dictionary with detailed memory statistics including log entries, snapshots, view cache, and compaction information.
- get_processing_stats()[ソース]¶
Get intent processing statistics.
- Returns:
Dictionary with processing statistics
- get_registered_effect_kinds()[ソース]¶
Get list of all registered custom effect kinds.
- Returns:
List of effect kind strings that have registered handlers
- issue_cancel_token(agent_id, req_id, context_digest=None)[ソース]¶
Issue cancellation token for generation tracking.
Creates a new cancellation token with tuple key tracking for the given agent and request. The token can be used to cancel long-running operations like LLM generation when context becomes stale.
- 戻り値の型:
- パラメータ:
- Args:
agent_id: Agent identifier req_id: Request identifier context_digest: Optional context digest for staleness detection
- Returns:
CancelToken for tracking cancellation
- Raises:
ValueError: If agent_id or req_id is empty
Requirements addressed: - 4.1: Issue cancel_token with current context_digest
- async register_agent(agent_id, policy, permissions=None)[ソース]¶
Register a new agent with observation policy and permissions.
- 戻り値の型:
- パラメータ:
agent_id (str)
policy (ObservationPolicy)
- Args:
agent_id: Unique identifier for the agent policy: Observation policy for this agent permissions: Set of permissions for the agent. If None, uses default permissions.
- Returns:
AgentHandle for the registered agent
- Raises:
ValueError: If agent_id is empty or already registered RuntimeError: If maximum agents exceeded
- register_effect_handler(effect_kind, handler)[ソース]¶
Register a custom effect handler for a specific effect kind.
This enables domain-specific effect types beyond the built-in kinds (Move, Speak, Interact, EntityCreated, EntityRemoved, MessageEmitted).
- Args:
effect_kind: The effect kind string to handle (e.g., "Attack", "Heal") handler: Async function that takes (effect, world_state) and applies the effect
- Example:
```python async def handle_attack(effect: Effect, world_state: WorldState) -> None:
attacker_id = effect["source_id"] target_id = effect["payload"]["target_id"] damage = effect["payload"]["damage"] # Apply damage logic to world_state ...
orchestrator.register_effect_handler("Attack", handle_attack) ```
- Notes:
Handlers are called after built-in effect kinds are checked
Handlers should update world_state in-place
Handlers should raise exceptions on failure (will be logged)
Multiple handlers can be registered for different effect kinds
- set_agent_backpressure_policy(agent_id, policy_name)[ソース]¶
Set backpressure policy for a specific agent.
- Args:
agent_id: Agent identifier policy_name: Backpressure policy name (defer, shed_oldest, drop_newest)
- Raises:
ValueError: If agent_id is not registered or policy_name is invalid
Requirements addressed: - 10.2: Configurable backpressure policies per agent class
- set_agent_interrupt_policy(agent_id, policy)[ソース]¶
Set interrupt policy for an agent.
- Args:
agent_id: Agent identifier policy: Interrupt policy ("always" or "only_conflict")
- Raises:
ValueError: If policy is not valid
Requirements addressed: - 4.5: interrupt_on_new_info policy "always" triggers on any new information - 4.6: interrupt_on_new_info policy "only_conflict" triggers only on conflicts
- set_sim_time_authority(authority)[ソース]¶
Set which adapter controls sim_time.
- Args:
authority: Authority for sim_time ("unity", "unreal", or "none")
- async submit_intent(intent)[ソース]¶
Two-phase commit: idempotency → quota → backpressure → priority → fairness → validator → commit.
- Args:
intent: Intent to process
- Returns:
Request ID for tracking
- Raises:
ValueError: If intent is invalid StaleContextError: If intent context is stale QuotaExceededError: If agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If intent validation fails
- async submit_intents(intents, sim_time=None)[ソース]¶
Submit multiple intents for simultaneous processing at the same sim_time.
This method enables true simultaneous intent execution within a single tick, maintaining deterministic ordering through (sim_time, priority, source_id, uuid).
- Args:
intents: List of intents to process simultaneously sim_time: Optional explicit sim_time for all intents (default: current time)
- Returns:
List of request IDs for tracking
- Raises:
ValueError: If intents list is empty or contains invalid intents StaleContextError: If any intent context is stale QuotaExceededError: If any agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If any intent validation fails
- Notes:
All intents are validated before any are enqueued
If any validation fails, none are processed (atomic batch)
Intents are ordered by priority within the same sim_time
Use this for "speak + move" or other simultaneous actions
- async submit_intents_batch(intents, sim_time=None, processing_mode=None, timeout=None)[ソース]¶
Submit multiple intents using concurrent processing capabilities.
This method provides enhanced batch processing with configurable concurrency modes for improved performance in multi-agent scenarios.
- 戻り値の型:
- パラメータ:
sim_time (float | None)
processing_mode (ProcessingMode | None)
timeout (float | None)
- Args:
intents: List of intents to process sim_time: Optional explicit sim_time for all intents processing_mode: Processing mode (sequential, concurrent, deterministic_concurrent) timeout: Timeout for concurrent operations
- Returns:
BatchResult containing effects, errors, and processing metadata
- Raises:
ValueError: If intents list is empty or contains invalid intents asyncio.TimeoutError: If processing exceeds timeout
- class gunn.OrchestratorConfig(max_agents=100, staleness_threshold=0, debounce_ms=100.0, deadline_ms=5000.0, token_budget=1000, backpressure_policy='defer', default_priority=0, dedup_ttl_minutes=60, max_dedup_entries=10000, dedup_cleanup_interval_minutes=10, dedup_warmup_minutes=5, max_queue_depth=100, quota_intents_per_minute=60, quota_tokens_per_minute=10000, use_in_memory_dedup=False, processing_idle_shutdown_ms=250.0, max_log_entries=10000, view_cache_size=1000, compaction_threshold=5000, snapshot_interval=1000, max_snapshots=10, memory_check_interval_seconds=60.0, auto_compaction_enabled=True, concurrent_processing_config=None)[ソース]¶
ベースクラス:
object
Configuration for the Orchestrator.
- パラメータ:
max_agents (int)
staleness_threshold (int)
debounce_ms (float)
deadline_ms (float)
token_budget (int)
backpressure_policy (str)
default_priority (int)
dedup_ttl_minutes (int)
max_dedup_entries (int)
dedup_cleanup_interval_minutes (int)
dedup_warmup_minutes (int)
max_queue_depth (int)
quota_intents_per_minute (int)
quota_tokens_per_minute (int)
use_in_memory_dedup (bool)
processing_idle_shutdown_ms (float)
max_log_entries (int)
view_cache_size (int)
compaction_threshold (int)
snapshot_interval (int)
max_snapshots (int)
memory_check_interval_seconds (float)
auto_compaction_enabled (bool)
concurrent_processing_config (ConcurrentProcessingConfig | None)
- class gunn.ProcessingMode(*values)[ソース]¶
-
Intent processing modes.
- SEQUENTIAL = 'sequential'¶
- CONCURRENT = 'concurrent'¶
- DETERMINISTIC_CONCURRENT = 'deterministic_concurrent'¶
- class gunn.RLFacade(orchestrator=None, config=None, world_id='default', timeout_seconds=30.0)[ソース]¶
ベースクラス:
object
RL-style facade wrapping Orchestrator functionality.
Provides a familiar RL interface with observe() and step() methods while operating on the same underlying event system as other facades.
Requirements addressed: - 5.1: Developers call env.observe(agent_id) to get observations - 5.2: Developers call env.step(agent_id, intent) and receive (Effect, ObservationDelta) tuple - 5.5: Operates on the same underlying event system
- パラメータ:
orchestrator (Orchestrator | None)
config (OrchestratorConfig | None)
world_id (str)
timeout_seconds (float)
- async cancel_agent_step(agent_id)[ソース]¶
Cancel any pending step operation for an agent.
- Args:
agent_id: Agent identifier
- Returns:
True if a step was cancelled, False if no step was pending
- Raises:
ValueError: If agent_id is not registered
- async get_agent_view_seq(agent_id)[ソース]¶
Get the current view sequence for an agent.
- Args:
agent_id: Agent identifier
- Returns:
Current view sequence number
- Raises:
ValueError: If agent_id is not registered
- get_orchestrator()[ソース]¶
Get the underlying orchestrator instance.
- 戻り値の型:
- Returns:
The orchestrator instance used by this facade
Requirements addressed: - 5.5: Both facades operate on the same underlying event system
- async observe(agent_id)[ソース]¶
Get current observations for an agent.
This method implements requirement 5.1: developers call env.observe(agent_id) to get observations.
- 戻り値の型:
- パラメータ:
agent_id (str)
- Args:
agent_id: Agent identifier
- Returns:
Current observation delta for the agent
- Raises:
ValueError: If agent_id is not registered TimeoutError: If observation retrieval times out RuntimeError: If agent handle is not available
Requirements addressed: - 5.1: WHEN using RL facade THEN developers SHALL call env.observe(agent_id) to get observations
- async register_agent(agent_id, policy)[ソース]¶
Register an agent with the simulation.
- 戻り値の型:
- パラメータ:
agent_id (str)
policy (ObservationPolicy)
- Args:
agent_id: Unique identifier for the agent policy: Observation policy for this agent
- Returns:
AgentHandle for the registered agent
- Raises:
ValueError: If agent_id is invalid or already registered RuntimeError: If maximum agents exceeded
- set_timeout(timeout_seconds)[ソース]¶
Set the default timeout for operations.
- Args:
timeout_seconds: New timeout value in seconds
- Raises:
ValueError: If timeout_seconds is not positive
- async shutdown()[ソース]¶
Shutdown the RL facade and clean up resources.
Cancels any pending step operations and shuts down the orchestrator.
- 戻り値の型:
- async step(agent_id, intent)[ソース]¶
Execute a step with an intent and return the effect and observation.
This method implements requirement 5.2: developers call env.step(agent_id, intent) and receive (Effect, ObservationDelta) tuple.
- 戻り値の型:
- パラメータ:
- Args:
agent_id: Agent identifier intent: Intent to execute
- Returns:
Tuple of (Effect, ObservationDelta) representing the action result
- Raises:
ValueError: If agent_id is not registered or intent is invalid StaleContextError: If intent context is stale QuotaExceededError: If agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If intent validation fails TimeoutError: If step execution times out
Requirements addressed: - 5.2: WHEN using RL facade THEN developers SHALL call env.step(agent_id, intent) and receive (Effect, ObservationDelta) tuple