gunn.core package

Submodules

gunn.core.event_log module

Event log implementation for append-only storage with integrity checking.

This module provides the EventLog class that maintains an immutable sequence of effects with hash chain integrity verification and replay capabilities.

class gunn.core.event_log.EventLogEntry(**data)[ソース]

ベースクラス: BaseModel

Single entry in the event log with integrity checking.

Each entry contains an effect along with metadata for ordering, timing, and hash chain integrity verification.

パラメータ:
global_seq: int
sim_time: float
wall_time: float
effect: Effect
source_metadata: dict[str, Any]
checksum: str
req_id: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class gunn.core.event_log.EventLog(world_id='default')[ソース]

ベースクラス: object

Append-only event log with hash chain integrity.

Provides thread-safe append operations, replay capabilities, and integrity verification through hash chaining.

Requirements addressed: - 1.2: Record events in global sequential log with global_seq identifier - 7.1: Maintain complete sequential log with global_seq numbers - 7.3: Provide replay capabilities from event log - 7.5: Validate state consistency using log hash/CRC and sequence gap detection

パラメータ:

world_id (str)

async append(effect, req_id=None)[ソース]

Append effect to log with hash chain checksum.

Thread-safe operation that adds a new effect to the log with proper sequencing and integrity checking.

戻り値の型:

int

パラメータ:
Args:

effect: Effect to append to the log req_id: Optional request ID for idempotency tracking

Returns:

The global_seq assigned to this entry

Raises:

ValueError: If effect is missing required fields

get_entries_since(since_seq)[ソース]

Get entries for replay and catch-up.

Returns all entries with global_seq > since_seq in order.

戻り値の型:

list[EventLogEntry]

パラメータ:

since_seq (int)

Args:

since_seq: Sequence number to start from (exclusive)

Returns:

List of entries after the specified sequence number

get_all_entries()[ソース]

Get all entries in the log.

戻り値の型:

list[EventLogEntry]

Returns:

Complete list of all log entries

get_latest_seq()[ソース]

Get the latest sequence number in the log.

戻り値の型:

int

Returns:

Latest global_seq, or 0 if log is empty

get_entry_count()[ソース]

Get total number of entries in the log.

戻り値の型:

int

Returns:

Number of entries in the log

validate_integrity()[ソース]

Validate complete log integrity.

Performs comprehensive integrity checks including: - Hash chain validation - Sequence gap detection - Corruption analysis

戻り値の型:

dict[str, Any]

Returns:

Dictionary with integrity report containing: - valid: Overall validity boolean - corrupted_entries: List of corrupted entry indices - missing_sequences: List of detected sequence gaps - total_entries: Total number of entries checked - details: Additional diagnostic information

find_entry_by_uuid(effect_uuid)[ソース]

Find entry by effect UUID.

戻り値の型:

EventLogEntry | None

パラメータ:

effect_uuid (str)

Args:

effect_uuid: UUID of the effect to find

Returns:

EventLogEntry if found, None otherwise

find_entries_by_req_id(req_id)[ソース]

Find entries by request ID.

戻り値の型:

list[EventLogEntry]

パラメータ:

req_id (str)

Args:

req_id: Request ID to search for

Returns:

List of entries with matching request ID

get_entries_by_source(source_id)[ソース]

Get all entries from a specific source.

戻り値の型:

list[EventLogEntry]

パラメータ:

source_id (str)

Args:

source_id: Source identifier to filter by

Returns:

List of entries from the specified source

get_entries_in_time_range(start_time=None, end_time=None, use_sim_time=True)[ソース]

Get entries within a time range.

戻り値の型:

list[EventLogEntry]

パラメータ:
  • start_time (float | None)

  • end_time (float | None)

  • use_sim_time (bool)

Args:

start_time: Start time (inclusive), None for no lower bound end_time: End time (inclusive), None for no upper bound use_sim_time: If True, filter by sim_time; if False, use wall_time

Returns:

List of entries within the specified time range

async compact(keep_entries=1000)[ソース]

Compact the log by removing old entries.

Keeps the most recent entries and removes older ones to manage memory. This operation maintains integrity by preserving the hash chain.

戻り値の型:

int

パラメータ:

keep_entries (int)

Args:

keep_entries: Number of recent entries to keep

Returns:

Number of entries removed

get_stats()[ソース]

Get log statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with log statistics including entry counts, time ranges, and integrity status

gunn.core.orchestrator module

Core orchestrator for multi-agent simulation.

This module provides the central Orchestrator class that coordinates all system operations including event ingestion, view generation, intent validation, and observation distribution with deterministic ordering.

class gunn.core.orchestrator.EffectValidator(*args, **kwargs)[ソース]

ベースクラス: Protocol

Protocol for validating intents before creating effects.

validate_intent(intent, world_state)[ソース]

Validate if intent can be executed.

戻り値の型:

bool

パラメータ:
Args:

intent: Intent to validate world_state: Current world state

Returns:

True if intent is valid and can be executed

set_agent_permissions(agent_id, permissions)[ソース]

Set permissions for an agent.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier permissions: Set of permission strings

class gunn.core.orchestrator.DefaultEffectValidator(max_intents_per_minute=60, max_tokens_per_minute=10000, default_cooldown_seconds=1.0, max_payload_size_bytes=10000, allowed_intent_kinds=None)[ソース]

ベースクラス: object

Default implementation of EffectValidator with comprehensive validation.

Provides validation for quota limits, cooldowns, permissions, and world state constraints to ensure intents can be safely executed.

Requirements addressed: - 1.5: Intent validation through EffectValidator before creating an Effect - 3.4: Validation for quota limits, cooldowns, and permissions - 10.3: Structured error codes for validation failures

パラメータ:
  • max_intents_per_minute (int)

  • max_tokens_per_minute (int)

  • default_cooldown_seconds (float)

  • max_payload_size_bytes (int)

  • allowed_intent_kinds (set[str] | None)

set_agent_permissions(agent_id, permissions)[ソース]

Set permissions for an agent.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier permissions: Set of permission strings

add_agent_permission(agent_id, permission)[ソース]

Add a permission for an agent.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • permission (str)

Args:

agent_id: Agent identifier permission: Permission string to add

remove_agent_permission(agent_id, permission)[ソース]

Remove a permission for an agent.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • permission (str)

Args:

agent_id: Agent identifier permission: Permission string to remove

set_intent_kind_cooldown(intent_kind, cooldown_seconds)[ソース]

Set cooldown for a specific intent kind.

戻り値の型:

None

パラメータ:
  • intent_kind (str)

  • cooldown_seconds (float)

Args:

intent_kind: Intent kind to set cooldown for cooldown_seconds: Cooldown duration in seconds

validate_intent(intent, world_state)[ソース]

Validate if intent can be executed.

Performs comprehensive validation including: - Basic structure validation - Permission checks - Quota limit validation - Cooldown enforcement - World state constraint checks - Payload size limits

戻り値の型:

bool

パラメータ:
Args:

intent: Intent to validate world_state: Current world state

Returns:

True if intent is valid and can be executed

Raises:

ValidationError: If validation fails with detailed reasons

get_agent_quota_status(agent_id)[ソース]

Get current quota status for an agent.

戻り値の型:

dict[str, Any]

パラメータ:

agent_id (str)

Args:

agent_id: Agent identifier

Returns:

Dictionary with quota status information

class gunn.core.orchestrator.AgentHandle(agent_id, orchestrator)[ソース]

ベースクラス: object

Per-agent interface for observation and intent submission.

Provides isolated interface for each agent with view sequence tracking and non-blocking operations.

パラメータ:
async next_observation()[ソース]

Get next observation delta from orchestrator's timed queue.

戻り値の型:

Any

Returns:

Next observation delta when available

Raises:

RuntimeError: If agent is not registered or queue is closed

async submit_intent(intent)[ソース]

Submit intent for validation and processing.

戻り値の型:

str

パラメータ:

intent (Intent)

Args:

intent: Intent to submit

Returns:

Request ID for tracking the intent

async cancel(req_id)[ソース]

Cancel pending intent by req_id.

戻り値の型:

None

パラメータ:

req_id (str)

Args:

req_id: Request ID to cancel

get_view_seq()[ソース]

Get current view sequence number.

戻り値の型:

int

Returns:

Current view sequence number for this agent

class gunn.core.orchestrator.OrchestratorConfig(max_agents=100, staleness_threshold=0, debounce_ms=100.0, deadline_ms=5000.0, token_budget=1000, backpressure_policy='defer', default_priority=0, dedup_ttl_minutes=60, max_dedup_entries=10000, dedup_cleanup_interval_minutes=10, dedup_warmup_minutes=5, max_queue_depth=100, quota_intents_per_minute=60, quota_tokens_per_minute=10000, use_in_memory_dedup=False, processing_idle_shutdown_ms=250.0, max_log_entries=10000, view_cache_size=1000, compaction_threshold=5000, snapshot_interval=1000, max_snapshots=10, memory_check_interval_seconds=60.0, auto_compaction_enabled=True, concurrent_processing_config=None)[ソース]

ベースクラス: object

Configuration for the Orchestrator.

パラメータ:
  • max_agents (int)

  • staleness_threshold (int)

  • debounce_ms (float)

  • deadline_ms (float)

  • token_budget (int)

  • backpressure_policy (str)

  • default_priority (int)

  • dedup_ttl_minutes (int)

  • max_dedup_entries (int)

  • dedup_cleanup_interval_minutes (int)

  • dedup_warmup_minutes (int)

  • max_queue_depth (int)

  • quota_intents_per_minute (int)

  • quota_tokens_per_minute (int)

  • use_in_memory_dedup (bool)

  • processing_idle_shutdown_ms (float)

  • max_log_entries (int)

  • view_cache_size (int)

  • compaction_threshold (int)

  • snapshot_interval (int)

  • max_snapshots (int)

  • memory_check_interval_seconds (float)

  • auto_compaction_enabled (bool)

  • concurrent_processing_config (ConcurrentProcessingConfig | None)

class gunn.core.orchestrator.Orchestrator(config, world_id='default', effect_validator=None)[ソース]

ベースクラス: object

Central coordinator for multi-agent simulation.

Coordinates all system operations including event ingestion and ordering, view generation and delta distribution, intent validation and effect creation, and cancellation token management.

Requirements addressed: - 1.1: Create WorldState as single source of truth - 1.3: Generate View based on agent's observation policy - 1.4: Process events using deterministic ordering - 9.1: Deterministic ordering using (sim_time, priority, source_id, uuid) tuple - 9.3: Fixed tie-breaker: priority > source > UUID lexicographic

パラメータ:
async initialize()[ソース]

Initialize orchestrator and start processing pipeline if needed.

戻り値の型:

None

set_sim_time_authority(authority)[ソース]

Set which adapter controls sim_time.

戻り値の型:

None

パラメータ:

authority (Literal['unity', 'unreal', 'none'])

Args:

authority: Authority for sim_time ("unity", "unreal", or "none")

async register_agent(agent_id, policy, permissions=None)[ソース]

Register a new agent with observation policy and permissions.

戻り値の型:

AgentHandle

パラメータ:
Args:

agent_id: Unique identifier for the agent policy: Observation policy for this agent permissions: Set of permissions for the agent. If None, uses default permissions.

Returns:

AgentHandle for the registered agent

Raises:

ValueError: If agent_id is empty or already registered RuntimeError: If maximum agents exceeded

register_effect_handler(effect_kind, handler)[ソース]

Register a custom effect handler for a specific effect kind.

This enables domain-specific effect types beyond the built-in kinds (Move, Speak, Interact, EntityCreated, EntityRemoved, MessageEmitted).

戻り値の型:

None

パラメータ:
Args:

effect_kind: The effect kind string to handle (e.g., "Attack", "Heal") handler: Async function that takes (effect, world_state) and applies the effect

Example:

```python async def handle_attack(effect: Effect, world_state: WorldState) -> None:

attacker_id = effect["source_id"] target_id = effect["payload"]["target_id"] damage = effect["payload"]["damage"] # Apply damage logic to world_state ...

orchestrator.register_effect_handler("Attack", handle_attack) ```

Notes:
  • Handlers are called after built-in effect kinds are checked

  • Handlers should update world_state in-place

  • Handlers should raise exceptions on failure (will be logged)

  • Multiple handlers can be registered for different effect kinds

unregister_effect_handler(effect_kind)[ソース]

Unregister a custom effect handler.

戻り値の型:

None

パラメータ:

effect_kind (str)

Args:

effect_kind: The effect kind to unregister

Raises:

KeyError: If no handler is registered for this effect kind

get_registered_effect_kinds()[ソース]

Get list of all registered custom effect kinds.

戻り値の型:

list[str]

Returns:

List of effect kind strings that have registered handlers

set_agent_backpressure_policy(agent_id, policy_name)[ソース]

Set backpressure policy for a specific agent.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • policy_name (str)

Args:

agent_id: Agent identifier policy_name: Backpressure policy name (defer, shed_oldest, drop_newest)

Raises:

ValueError: If agent_id is not registered or policy_name is invalid

Requirements addressed: - 10.2: Configurable backpressure policies per agent class

async broadcast_event(draft)[ソース]

Create complete effect from draft and distribute observations.

Converts EffectDraft to complete Effect with priority completion, updates world state, generates observation deltas for affected agents, and delivers observations using timed queues with latency models.

戻り値の型:

None

パラメータ:

draft (EffectDraft)

Args:

draft: Effect draft to process and broadcast

Raises:

ValueError: If draft is missing required fields

Requirements addressed: - 2.2: Generate ObservationDelta patches for affected agents - 2.5: Incremental updates via ObservationDelta patches - 6.4: ObservationDelta delivery latency ≤ 20ms - 6.5: Timed delivery using per-agent TimedQueues with latency models

async submit_intent(intent)[ソース]

Two-phase commit: idempotency → quota → backpressure → priority → fairness → validator → commit.

戻り値の型:

str

パラメータ:

intent (Intent)

Args:

intent: Intent to process

Returns:

Request ID for tracking

Raises:

ValueError: If intent is invalid StaleContextError: If intent context is stale QuotaExceededError: If agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If intent validation fails

async submit_intents(intents, sim_time=None)[ソース]

Submit multiple intents for simultaneous processing at the same sim_time.

This method enables true simultaneous intent execution within a single tick, maintaining deterministic ordering through (sim_time, priority, source_id, uuid).

戻り値の型:

list[str]

パラメータ:
Args:

intents: List of intents to process simultaneously sim_time: Optional explicit sim_time for all intents (default: current time)

Returns:

List of request IDs for tracking

Raises:

ValueError: If intents list is empty or contains invalid intents StaleContextError: If any intent context is stale QuotaExceededError: If any agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If any intent validation fails

Notes:
  • All intents are validated before any are enqueued

  • If any validation fails, none are processed (atomic batch)

  • Intents are ordered by priority within the same sim_time

  • Use this for "speak + move" or other simultaneous actions

async submit_intents_batch(intents, sim_time=None, processing_mode=None, timeout=None)[ソース]

Submit multiple intents using concurrent processing capabilities.

This method provides enhanced batch processing with configurable concurrency modes for improved performance in multi-agent scenarios.

戻り値の型:

BatchResult

パラメータ:
Args:

intents: List of intents to process sim_time: Optional explicit sim_time for all intents processing_mode: Processing mode (sequential, concurrent, deterministic_concurrent) timeout: Timeout for concurrent operations

Returns:

BatchResult containing effects, errors, and processing metadata

Raises:

ValueError: If intents list is empty or contains invalid intents asyncio.TimeoutError: If processing exceeds timeout

issue_cancel_token(agent_id, req_id, context_digest=None)[ソース]

Issue cancellation token for generation tracking.

Creates a new cancellation token with tuple key tracking for the given agent and request. The token can be used to cancel long-running operations like LLM generation when context becomes stale.

戻り値の型:

CancelToken

パラメータ:
  • agent_id (str)

  • req_id (str)

  • context_digest (str | None)

Args:

agent_id: Agent identifier req_id: Request identifier context_digest: Optional context digest for staleness detection

Returns:

CancelToken for tracking cancellation

Raises:

ValueError: If agent_id or req_id is empty

Requirements addressed: - 4.1: Issue cancel_token with current context_digest

async cancel_if_stale(agent_id, req_id, new_view_seq)[ソース]

Check staleness and cancel if needed with debounce logic.

Evaluates whether the context has become stale based on the staleness threshold and applies debounce logic to prevent rapid successive interruptions.

戻り値の型:

bool

パラメータ:
  • agent_id (str)

  • req_id (str)

  • new_view_seq (int)

Args:

agent_id: Agent identifier req_id: Request identifier new_view_seq: New view sequence number to check against

Returns:

True if cancellation occurred

Requirements addressed: - 4.2: Evaluate staleness using latest_view_seq > context_seq + staleness_threshold - 4.3: Cancel current generation when context becomes stale - 4.7: Suppress rapid successive interruptions within debounce window

set_agent_interrupt_policy(agent_id, policy)[ソース]

Set interrupt policy for an agent.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier policy: Interrupt policy ("always" or "only_conflict")

Raises:

ValueError: If policy is not valid

Requirements addressed: - 4.5: interrupt_on_new_info policy "always" triggers on any new information - 4.6: interrupt_on_new_info policy "only_conflict" triggers only on conflicts

get_agent_interrupt_policy(agent_id)[ソース]

Get interrupt policy for an agent.

戻り値の型:

str

パラメータ:

agent_id (str)

Args:

agent_id: Agent identifier

Returns:

Interrupt policy ("always" or "only_conflict"), defaults to "always"

async check_and_cancel_stale_tokens(effect)[ソース]

Check all active tokens for staleness and cancel if needed.

This method is called during observation distribution to automatically cancel tokens when new events make their context stale.

戻り値の型:

list[str]

パラメータ:

effect (Effect)

Args:

effect: New effect that might make contexts stale

Returns:

List of request IDs that were cancelled

Requirements addressed: - 4.2: Evaluate staleness when new events occur during generation - 4.3: Cancel current generation when context becomes stale - 4.4: Provide cancellation reason to the agent

cleanup_cancelled_tokens()[ソース]

Clean up cancelled tokens to prevent memory leaks.

戻り値の型:

int

Returns:

Number of tokens cleaned up

get_active_cancel_tokens()[ソース]

Get active cancel tokens grouped by agent.

戻り値の型:

dict[str, list[str]]

Returns:

Dictionary mapping agent_id to list of active req_ids

get_agent_count()[ソース]

Get number of registered agents.

戻り値の型:

int

Returns:

Number of currently registered agents

get_latest_seq()[ソース]

Get latest global sequence number.

戻り値の型:

int

Returns:

Latest global sequence number

get_world_state()[ソース]

Get current world state.

戻り値の型:

WorldState

Returns:

Current world state (read-only)

get_processing_stats()[ソース]

Get intent processing statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with processing statistics

get_memory_stats()[ソース]

Get comprehensive memory management statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with detailed memory statistics including log entries, snapshots, view cache, and compaction information.

async force_compaction()[ソース]

Force log compaction regardless of thresholds.

戻り値の型:

int

Returns:

Number of entries removed during compaction

async create_snapshot()[ソース]

Force creation of a WorldState snapshot.

戻り値の型:

Any

Returns:

Created snapshot

evict_old_views(max_age_seconds=3600.0)[ソース]

Evict old cached views to free memory.

戻り値の型:

int

パラメータ:

max_age_seconds (float)

Args:

max_age_seconds: Maximum age for cached views

Returns:

Number of views evicted

async shutdown()[ソース]

Shutdown orchestrator and clean up resources.

戻り値の型:

None

gunn.core.test_effect_validator module

Module contents

class gunn.core.AgentHandle(agent_id, orchestrator)[ソース]

ベースクラス: object

Per-agent interface for observation and intent submission.

Provides isolated interface for each agent with view sequence tracking and non-blocking operations.

パラメータ:
async cancel(req_id)[ソース]

Cancel pending intent by req_id.

戻り値の型:

None

パラメータ:

req_id (str)

Args:

req_id: Request ID to cancel

get_view_seq()[ソース]

Get current view sequence number.

戻り値の型:

int

Returns:

Current view sequence number for this agent

async next_observation()[ソース]

Get next observation delta from orchestrator's timed queue.

戻り値の型:

Any

Returns:

Next observation delta when available

Raises:

RuntimeError: If agent is not registered or queue is closed

async submit_intent(intent)[ソース]

Submit intent for validation and processing.

戻り値の型:

str

パラメータ:

intent (Intent)

Args:

intent: Intent to submit

Returns:

Request ID for tracking the intent

class gunn.core.DefaultEffectValidator(max_intents_per_minute=60, max_tokens_per_minute=10000, default_cooldown_seconds=1.0, max_payload_size_bytes=10000, allowed_intent_kinds=None)[ソース]

ベースクラス: object

Default implementation of EffectValidator with comprehensive validation.

Provides validation for quota limits, cooldowns, permissions, and world state constraints to ensure intents can be safely executed.

Requirements addressed: - 1.5: Intent validation through EffectValidator before creating an Effect - 3.4: Validation for quota limits, cooldowns, and permissions - 10.3: Structured error codes for validation failures

パラメータ:
  • max_intents_per_minute (int)

  • max_tokens_per_minute (int)

  • default_cooldown_seconds (float)

  • max_payload_size_bytes (int)

  • allowed_intent_kinds (set[str] | None)

add_agent_permission(agent_id, permission)[ソース]

Add a permission for an agent.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • permission (str)

Args:

agent_id: Agent identifier permission: Permission string to add

get_agent_quota_status(agent_id)[ソース]

Get current quota status for an agent.

戻り値の型:

dict[str, Any]

パラメータ:

agent_id (str)

Args:

agent_id: Agent identifier

Returns:

Dictionary with quota status information

remove_agent_permission(agent_id, permission)[ソース]

Remove a permission for an agent.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • permission (str)

Args:

agent_id: Agent identifier permission: Permission string to remove

set_agent_permissions(agent_id, permissions)[ソース]

Set permissions for an agent.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier permissions: Set of permission strings

set_intent_kind_cooldown(intent_kind, cooldown_seconds)[ソース]

Set cooldown for a specific intent kind.

戻り値の型:

None

パラメータ:
  • intent_kind (str)

  • cooldown_seconds (float)

Args:

intent_kind: Intent kind to set cooldown for cooldown_seconds: Cooldown duration in seconds

validate_intent(intent, world_state)[ソース]

Validate if intent can be executed.

Performs comprehensive validation including: - Basic structure validation - Permission checks - Quota limit validation - Cooldown enforcement - World state constraint checks - Payload size limits

戻り値の型:

bool

パラメータ:
Args:

intent: Intent to validate world_state: Current world state

Returns:

True if intent is valid and can be executed

Raises:

ValidationError: If validation fails with detailed reasons

class gunn.core.EffectValidator(*args, **kwargs)[ソース]

ベースクラス: Protocol

Protocol for validating intents before creating effects.

set_agent_permissions(agent_id, permissions)[ソース]

Set permissions for an agent.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier permissions: Set of permission strings

validate_intent(intent, world_state)[ソース]

Validate if intent can be executed.

戻り値の型:

bool

パラメータ:
Args:

intent: Intent to validate world_state: Current world state

Returns:

True if intent is valid and can be executed

class gunn.core.EventLog(world_id='default')[ソース]

ベースクラス: object

Append-only event log with hash chain integrity.

Provides thread-safe append operations, replay capabilities, and integrity verification through hash chaining.

Requirements addressed: - 1.2: Record events in global sequential log with global_seq identifier - 7.1: Maintain complete sequential log with global_seq numbers - 7.3: Provide replay capabilities from event log - 7.5: Validate state consistency using log hash/CRC and sequence gap detection

パラメータ:

world_id (str)

async append(effect, req_id=None)[ソース]

Append effect to log with hash chain checksum.

Thread-safe operation that adds a new effect to the log with proper sequencing and integrity checking.

戻り値の型:

int

パラメータ:
Args:

effect: Effect to append to the log req_id: Optional request ID for idempotency tracking

Returns:

The global_seq assigned to this entry

Raises:

ValueError: If effect is missing required fields

async compact(keep_entries=1000)[ソース]

Compact the log by removing old entries.

Keeps the most recent entries and removes older ones to manage memory. This operation maintains integrity by preserving the hash chain.

戻り値の型:

int

パラメータ:

keep_entries (int)

Args:

keep_entries: Number of recent entries to keep

Returns:

Number of entries removed

find_entries_by_req_id(req_id)[ソース]

Find entries by request ID.

戻り値の型:

list[EventLogEntry]

パラメータ:

req_id (str)

Args:

req_id: Request ID to search for

Returns:

List of entries with matching request ID

find_entry_by_uuid(effect_uuid)[ソース]

Find entry by effect UUID.

戻り値の型:

EventLogEntry | None

パラメータ:

effect_uuid (str)

Args:

effect_uuid: UUID of the effect to find

Returns:

EventLogEntry if found, None otherwise

get_all_entries()[ソース]

Get all entries in the log.

戻り値の型:

list[EventLogEntry]

Returns:

Complete list of all log entries

get_entries_by_source(source_id)[ソース]

Get all entries from a specific source.

戻り値の型:

list[EventLogEntry]

パラメータ:

source_id (str)

Args:

source_id: Source identifier to filter by

Returns:

List of entries from the specified source

get_entries_in_time_range(start_time=None, end_time=None, use_sim_time=True)[ソース]

Get entries within a time range.

戻り値の型:

list[EventLogEntry]

パラメータ:
  • start_time (float | None)

  • end_time (float | None)

  • use_sim_time (bool)

Args:

start_time: Start time (inclusive), None for no lower bound end_time: End time (inclusive), None for no upper bound use_sim_time: If True, filter by sim_time; if False, use wall_time

Returns:

List of entries within the specified time range

get_entries_since(since_seq)[ソース]

Get entries for replay and catch-up.

Returns all entries with global_seq > since_seq in order.

戻り値の型:

list[EventLogEntry]

パラメータ:

since_seq (int)

Args:

since_seq: Sequence number to start from (exclusive)

Returns:

List of entries after the specified sequence number

get_entry_count()[ソース]

Get total number of entries in the log.

戻り値の型:

int

Returns:

Number of entries in the log

get_latest_seq()[ソース]

Get the latest sequence number in the log.

戻り値の型:

int

Returns:

Latest global_seq, or 0 if log is empty

get_stats()[ソース]

Get log statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with log statistics including entry counts, time ranges, and integrity status

validate_integrity()[ソース]

Validate complete log integrity.

Performs comprehensive integrity checks including: - Hash chain validation - Sequence gap detection - Corruption analysis

戻り値の型:

dict[str, Any]

Returns:

Dictionary with integrity report containing: - valid: Overall validity boolean - corrupted_entries: List of corrupted entry indices - missing_sequences: List of detected sequence gaps - total_entries: Total number of entries checked - details: Additional diagnostic information

class gunn.core.EventLogEntry(**data)[ソース]

ベースクラス: BaseModel

Single entry in the event log with integrity checking.

Each entry contains an effect along with metadata for ordering, timing, and hash chain integrity verification.

パラメータ:
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

global_seq: int
sim_time: float
wall_time: float
effect: Effect
source_metadata: dict[str, Any]
checksum: str
req_id: str | None
class gunn.core.Orchestrator(config, world_id='default', effect_validator=None)[ソース]

ベースクラス: object

Central coordinator for multi-agent simulation.

Coordinates all system operations including event ingestion and ordering, view generation and delta distribution, intent validation and effect creation, and cancellation token management.

Requirements addressed: - 1.1: Create WorldState as single source of truth - 1.3: Generate View based on agent's observation policy - 1.4: Process events using deterministic ordering - 9.1: Deterministic ordering using (sim_time, priority, source_id, uuid) tuple - 9.3: Fixed tie-breaker: priority > source > UUID lexicographic

パラメータ:
async broadcast_event(draft)[ソース]

Create complete effect from draft and distribute observations.

Converts EffectDraft to complete Effect with priority completion, updates world state, generates observation deltas for affected agents, and delivers observations using timed queues with latency models.

戻り値の型:

None

パラメータ:

draft (EffectDraft)

Args:

draft: Effect draft to process and broadcast

Raises:

ValueError: If draft is missing required fields

Requirements addressed: - 2.2: Generate ObservationDelta patches for affected agents - 2.5: Incremental updates via ObservationDelta patches - 6.4: ObservationDelta delivery latency ≤ 20ms - 6.5: Timed delivery using per-agent TimedQueues with latency models

async cancel_if_stale(agent_id, req_id, new_view_seq)[ソース]

Check staleness and cancel if needed with debounce logic.

Evaluates whether the context has become stale based on the staleness threshold and applies debounce logic to prevent rapid successive interruptions.

戻り値の型:

bool

パラメータ:
  • agent_id (str)

  • req_id (str)

  • new_view_seq (int)

Args:

agent_id: Agent identifier req_id: Request identifier new_view_seq: New view sequence number to check against

Returns:

True if cancellation occurred

Requirements addressed: - 4.2: Evaluate staleness using latest_view_seq > context_seq + staleness_threshold - 4.3: Cancel current generation when context becomes stale - 4.7: Suppress rapid successive interruptions within debounce window

async check_and_cancel_stale_tokens(effect)[ソース]

Check all active tokens for staleness and cancel if needed.

This method is called during observation distribution to automatically cancel tokens when new events make their context stale.

戻り値の型:

list[str]

パラメータ:

effect (Effect)

Args:

effect: New effect that might make contexts stale

Returns:

List of request IDs that were cancelled

Requirements addressed: - 4.2: Evaluate staleness when new events occur during generation - 4.3: Cancel current generation when context becomes stale - 4.4: Provide cancellation reason to the agent

cleanup_cancelled_tokens()[ソース]

Clean up cancelled tokens to prevent memory leaks.

戻り値の型:

int

Returns:

Number of tokens cleaned up

async create_snapshot()[ソース]

Force creation of a WorldState snapshot.

戻り値の型:

Any

Returns:

Created snapshot

evict_old_views(max_age_seconds=3600.0)[ソース]

Evict old cached views to free memory.

戻り値の型:

int

パラメータ:

max_age_seconds (float)

Args:

max_age_seconds: Maximum age for cached views

Returns:

Number of views evicted

async force_compaction()[ソース]

Force log compaction regardless of thresholds.

戻り値の型:

int

Returns:

Number of entries removed during compaction

get_active_cancel_tokens()[ソース]

Get active cancel tokens grouped by agent.

戻り値の型:

dict[str, list[str]]

Returns:

Dictionary mapping agent_id to list of active req_ids

get_agent_count()[ソース]

Get number of registered agents.

戻り値の型:

int

Returns:

Number of currently registered agents

get_agent_interrupt_policy(agent_id)[ソース]

Get interrupt policy for an agent.

戻り値の型:

str

パラメータ:

agent_id (str)

Args:

agent_id: Agent identifier

Returns:

Interrupt policy ("always" or "only_conflict"), defaults to "always"

get_latest_seq()[ソース]

Get latest global sequence number.

戻り値の型:

int

Returns:

Latest global sequence number

get_memory_stats()[ソース]

Get comprehensive memory management statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with detailed memory statistics including log entries, snapshots, view cache, and compaction information.

get_processing_stats()[ソース]

Get intent processing statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with processing statistics

get_registered_effect_kinds()[ソース]

Get list of all registered custom effect kinds.

戻り値の型:

list[str]

Returns:

List of effect kind strings that have registered handlers

get_world_state()[ソース]

Get current world state.

戻り値の型:

WorldState

Returns:

Current world state (read-only)

async initialize()[ソース]

Initialize orchestrator and start processing pipeline if needed.

戻り値の型:

None

issue_cancel_token(agent_id, req_id, context_digest=None)[ソース]

Issue cancellation token for generation tracking.

Creates a new cancellation token with tuple key tracking for the given agent and request. The token can be used to cancel long-running operations like LLM generation when context becomes stale.

戻り値の型:

CancelToken

パラメータ:
  • agent_id (str)

  • req_id (str)

  • context_digest (str | None)

Args:

agent_id: Agent identifier req_id: Request identifier context_digest: Optional context digest for staleness detection

Returns:

CancelToken for tracking cancellation

Raises:

ValueError: If agent_id or req_id is empty

Requirements addressed: - 4.1: Issue cancel_token with current context_digest

async register_agent(agent_id, policy, permissions=None)[ソース]

Register a new agent with observation policy and permissions.

戻り値の型:

AgentHandle

パラメータ:
Args:

agent_id: Unique identifier for the agent policy: Observation policy for this agent permissions: Set of permissions for the agent. If None, uses default permissions.

Returns:

AgentHandle for the registered agent

Raises:

ValueError: If agent_id is empty or already registered RuntimeError: If maximum agents exceeded

register_effect_handler(effect_kind, handler)[ソース]

Register a custom effect handler for a specific effect kind.

This enables domain-specific effect types beyond the built-in kinds (Move, Speak, Interact, EntityCreated, EntityRemoved, MessageEmitted).

戻り値の型:

None

パラメータ:
Args:

effect_kind: The effect kind string to handle (e.g., "Attack", "Heal") handler: Async function that takes (effect, world_state) and applies the effect

Example:

```python async def handle_attack(effect: Effect, world_state: WorldState) -> None:

attacker_id = effect["source_id"] target_id = effect["payload"]["target_id"] damage = effect["payload"]["damage"] # Apply damage logic to world_state ...

orchestrator.register_effect_handler("Attack", handle_attack) ```

Notes:
  • Handlers are called after built-in effect kinds are checked

  • Handlers should update world_state in-place

  • Handlers should raise exceptions on failure (will be logged)

  • Multiple handlers can be registered for different effect kinds

set_agent_backpressure_policy(agent_id, policy_name)[ソース]

Set backpressure policy for a specific agent.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • policy_name (str)

Args:

agent_id: Agent identifier policy_name: Backpressure policy name (defer, shed_oldest, drop_newest)

Raises:

ValueError: If agent_id is not registered or policy_name is invalid

Requirements addressed: - 10.2: Configurable backpressure policies per agent class

set_agent_interrupt_policy(agent_id, policy)[ソース]

Set interrupt policy for an agent.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier policy: Interrupt policy ("always" or "only_conflict")

Raises:

ValueError: If policy is not valid

Requirements addressed: - 4.5: interrupt_on_new_info policy "always" triggers on any new information - 4.6: interrupt_on_new_info policy "only_conflict" triggers only on conflicts

set_sim_time_authority(authority)[ソース]

Set which adapter controls sim_time.

戻り値の型:

None

パラメータ:

authority (Literal['unity', 'unreal', 'none'])

Args:

authority: Authority for sim_time ("unity", "unreal", or "none")

async shutdown()[ソース]

Shutdown orchestrator and clean up resources.

戻り値の型:

None

async submit_intent(intent)[ソース]

Two-phase commit: idempotency → quota → backpressure → priority → fairness → validator → commit.

戻り値の型:

str

パラメータ:

intent (Intent)

Args:

intent: Intent to process

Returns:

Request ID for tracking

Raises:

ValueError: If intent is invalid StaleContextError: If intent context is stale QuotaExceededError: If agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If intent validation fails

async submit_intents(intents, sim_time=None)[ソース]

Submit multiple intents for simultaneous processing at the same sim_time.

This method enables true simultaneous intent execution within a single tick, maintaining deterministic ordering through (sim_time, priority, source_id, uuid).

戻り値の型:

list[str]

パラメータ:
Args:

intents: List of intents to process simultaneously sim_time: Optional explicit sim_time for all intents (default: current time)

Returns:

List of request IDs for tracking

Raises:

ValueError: If intents list is empty or contains invalid intents StaleContextError: If any intent context is stale QuotaExceededError: If any agent quota is exceeded BackpressureError: If backpressure limits are exceeded ValidationError: If any intent validation fails

Notes:
  • All intents are validated before any are enqueued

  • If any validation fails, none are processed (atomic batch)

  • Intents are ordered by priority within the same sim_time

  • Use this for "speak + move" or other simultaneous actions

async submit_intents_batch(intents, sim_time=None, processing_mode=None, timeout=None)[ソース]

Submit multiple intents using concurrent processing capabilities.

This method provides enhanced batch processing with configurable concurrency modes for improved performance in multi-agent scenarios.

戻り値の型:

BatchResult

パラメータ:
Args:

intents: List of intents to process sim_time: Optional explicit sim_time for all intents processing_mode: Processing mode (sequential, concurrent, deterministic_concurrent) timeout: Timeout for concurrent operations

Returns:

BatchResult containing effects, errors, and processing metadata

Raises:

ValueError: If intents list is empty or contains invalid intents asyncio.TimeoutError: If processing exceeds timeout

unregister_effect_handler(effect_kind)[ソース]

Unregister a custom effect handler.

戻り値の型:

None

パラメータ:

effect_kind (str)

Args:

effect_kind: The effect kind to unregister

Raises:

KeyError: If no handler is registered for this effect kind

class gunn.core.OrchestratorConfig(max_agents=100, staleness_threshold=0, debounce_ms=100.0, deadline_ms=5000.0, token_budget=1000, backpressure_policy='defer', default_priority=0, dedup_ttl_minutes=60, max_dedup_entries=10000, dedup_cleanup_interval_minutes=10, dedup_warmup_minutes=5, max_queue_depth=100, quota_intents_per_minute=60, quota_tokens_per_minute=10000, use_in_memory_dedup=False, processing_idle_shutdown_ms=250.0, max_log_entries=10000, view_cache_size=1000, compaction_threshold=5000, snapshot_interval=1000, max_snapshots=10, memory_check_interval_seconds=60.0, auto_compaction_enabled=True, concurrent_processing_config=None)[ソース]

ベースクラス: object

Configuration for the Orchestrator.

パラメータ:
  • max_agents (int)

  • staleness_threshold (int)

  • debounce_ms (float)

  • deadline_ms (float)

  • token_budget (int)

  • backpressure_policy (str)

  • default_priority (int)

  • dedup_ttl_minutes (int)

  • max_dedup_entries (int)

  • dedup_cleanup_interval_minutes (int)

  • dedup_warmup_minutes (int)

  • max_queue_depth (int)

  • quota_intents_per_minute (int)

  • quota_tokens_per_minute (int)

  • use_in_memory_dedup (bool)

  • processing_idle_shutdown_ms (float)

  • max_log_entries (int)

  • view_cache_size (int)

  • compaction_threshold (int)

  • snapshot_interval (int)

  • max_snapshots (int)

  • memory_check_interval_seconds (float)

  • auto_compaction_enabled (bool)

  • concurrent_processing_config (ConcurrentProcessingConfig | None)