gunn.utils package

Submodules

gunn.utils.backpressure module

Backpressure policies for queue management.

This module implements different backpressure policies to handle queue overflow and resource exhaustion scenarios with configurable strategies.

class gunn.utils.backpressure.BackpressurePolicy(threshold, agent_id='unknown')[ソース]

ベースクラス: ABC, Generic

Abstract base class for backpressure policies.

Defines the interface for handling queue overflow scenarios with different strategies for managing resource exhaustion.

Requirements addressed: - 10.2: Backpressure policies (defer, shed oldest, drop newest) - 10.5: Queue depth monitoring and backpressure triggers

パラメータ:
  • threshold (int)

  • agent_id (str)

abstractmethod async handle_overflow(queue, new_item)[ソース]

Handle queue overflow when threshold is exceeded.

戻り値の型:

bool

パラメータ:
  • queue (deque)

  • new_item (T)

Args:

queue: The queue that is overflowing new_item: New item trying to be added

Returns:

True if item was handled (added or dropped), False if should defer

Raises:

BackpressureError: If backpressure should be applied

abstract property policy_name: str

Get the policy name for metrics and logging.

check_threshold(current_depth, queue_type='queue')[ソース]

Check if queue depth exceeds threshold and record metrics.

戻り値の型:

None

パラメータ:
  • current_depth (int)

  • queue_type (str)

Args:

current_depth: Current queue depth queue_type: Type of queue for metrics

Raises:

BackpressureError: If threshold is exceeded

class gunn.utils.backpressure.DeferPolicy(threshold, agent_id='unknown')[ソース]

ベースクラス: BackpressurePolicy, Generic

Defer policy - block new items when threshold exceeded.

This is the default policy that raises BackpressureError to defer processing until queue depth decreases below threshold.

Requirements addressed: - 10.2: Backpressure policies with defer as default

パラメータ:
  • threshold (int)

  • agent_id (str)

property policy_name: str

Get the policy name for metrics and logging.

async handle_overflow(queue, new_item)[ソース]

Handle overflow by deferring (raising BackpressureError).

戻り値の型:

bool

パラメータ:
  • queue (deque)

  • new_item (T)

Args:

queue: The overflowing queue new_item: New item to be deferred

Returns:

False (never handles the item, always defers)

Raises:

BackpressureError: Always raised to defer the operation

class gunn.utils.backpressure.ShedOldestPolicy(threshold, agent_id='unknown')[ソース]

ベースクラス: BackpressurePolicy, Generic

Shed oldest policy - drop oldest items to make room for new ones.

When queue exceeds threshold, removes oldest items to make room for new items, maintaining queue size at threshold.

Requirements addressed: - 10.2: Backpressure policies with shed oldest option

パラメータ:
  • threshold (int)

  • agent_id (str)

property policy_name: str

Get the policy name for metrics and logging.

async handle_overflow(queue, new_item)[ソース]

Handle overflow by dropping oldest items.

戻り値の型:

bool

パラメータ:
  • queue (deque)

  • new_item (T)

Args:

queue: The overflowing queue new_item: New item to add

Returns:

True (item was handled by adding and shedding old items)

class gunn.utils.backpressure.DropNewestPolicy(threshold, agent_id='unknown')[ソース]

ベースクラス: BackpressurePolicy, Generic

Drop newest policy - drop new items when threshold exceeded.

When queue exceeds threshold, drops the new item instead of adding it, preserving existing queued items.

Requirements addressed: - 10.2: Backpressure policies with drop newest option

パラメータ:
  • threshold (int)

  • agent_id (str)

property policy_name: str

Get the policy name for metrics and logging.

async handle_overflow(queue, new_item)[ソース]

Handle overflow by dropping the new item.

戻り値の型:

bool

パラメータ:
  • queue (deque)

  • new_item (T)

Args:

queue: The overflowing queue new_item: New item to potentially drop

Returns:

True (item was handled by dropping it)

class gunn.utils.backpressure.BackpressureManager[ソース]

ベースクラス: object

Manager for applying backpressure policies to queues.

Provides a unified interface for managing different backpressure policies and applying them consistently across the system.

Requirements addressed: - 10.2: Configurable backpressure policies per agent class - 10.5: Queue depth monitoring and backpressure triggers

create_policy(policy_name, threshold, agent_id='unknown')[ソース]

Create a backpressure policy instance.

戻り値の型:

BackpressurePolicy[Any]

パラメータ:
  • policy_name (str)

  • threshold (int)

  • agent_id (str)

Args:

policy_name: Name of the policy (defer, shed_oldest, drop_newest) threshold: Queue depth threshold agent_id: Agent identifier for metrics

Returns:

BackpressurePolicy instance

Raises:

ValueError: If policy_name is not recognized

register_policy(name, policy_class)[ソース]

Register a custom backpressure policy.

戻り値の型:

None

パラメータ:
Args:

name: Policy name policy_class: Policy class implementing BackpressurePolicy

property available_policies: list[str]

Get list of available policy names.

class gunn.utils.backpressure.BackpressureQueue(policy, maxsize=0, queue_type='queue')[ソース]

ベースクラス: Generic

Queue with integrated backpressure policy support.

A queue implementation that automatically applies backpressure policies when depth thresholds are exceeded.

Requirements addressed: - 10.2: Backpressure policies integrated with queue operations - 10.5: Automatic backpressure triggers based on queue depth

パラメータ:
async put(item)[ソース]

Put an item in the queue with backpressure handling.

戻り値の型:

None

パラメータ:

item (T)

Args:

item: Item to add to queue

Raises:

BackpressureError: If policy defers the operation

async get()[ソース]

Get an item from the queue.

戻り値の型:

TypeVar(T)

Returns:

Next item from queue

Raises:

asyncio.QueueEmpty: If queue is empty

qsize()[ソース]

Get current queue size.

戻り値の型:

int

empty()[ソース]

Check if queue is empty.

戻り値の型:

bool

full()[ソース]

Check if queue is full (based on maxsize).

戻り値の型:

bool

gunn.utils.errors module

Structured error types for multi-agent simulation.

This module defines error types for various failure scenarios in the simulation system with recovery actions and structured information.

class gunn.utils.errors.RecoveryAction(*values)[ソース]

ベースクラス: Enum

Recovery actions for error handling.

RETRY = 'retry'
ABORT = 'abort'
REGENERATE = 'regenerate'
RETRY_WITH_DELAY = 'retry_with_delay'
MODIFY_INTENT = 'modify_intent'
DEFER = 'defer'
SHED_OLDEST = 'shed_oldest'
DROP_NEWEST = 'drop_newest'
exception gunn.utils.errors.SimulationError(message, recovery_action=RecoveryAction.ABORT)[ソース]

ベースクラス: Exception

Base exception for simulation errors.

パラメータ:
exception gunn.utils.errors.StaleContextError(req_id, expected_seq, actual_seq, threshold=0)[ソース]

ベースクラス: SimulationError

Error raised when intent context becomes stale.

This error occurs when an agent's context (view_seq) is outdated compared to the current world state, indicating the intent was based on stale information.

Requirements addressed: - 4.2: Staleness detection using latest_view_seq > context_seq + threshold - 10.3: Structured error codes including STALE_CONTEXT

パラメータ:
  • req_id (str)

  • expected_seq (int)

  • actual_seq (int)

  • threshold (int)

exception gunn.utils.errors.IntentConflictError(intent, conflicting_effects)[ソース]

ベースクラス: SimulationError

Error raised when intent conflicts with existing effects.

This error occurs when an intent cannot be executed due to conflicts with recently processed effects or other intents.

Requirements addressed: - 3.4: Intent conflict resolution - 10.3: Structured error codes including INTENT_CONFLICT

パラメータ:
exception gunn.utils.errors.QuotaExceededError(agent_id, quota_type, limit, current=0)[ソース]

ベースクラス: SimulationError

Error raised when agent exceeds quota limits.

This error occurs when an agent attempts to perform an action that would exceed their configured quota limits.

Requirements addressed: - 1.5: Quota limits and permissions - 10.3: Structured error codes including QUOTA_EXCEEDED

パラメータ:
exception gunn.utils.errors.TimeoutError(operation, deadline_ms, actual_ms)[ソース]

ベースクラス: SimulationError

Error raised when operations exceed deadline.

This error occurs when intent processing or generation takes longer than the configured deadline.

Requirements addressed: - 3.5: Deadline enforcement for intent processing - 10.3: Structured error codes including TIMEOUT

パラメータ:
exception gunn.utils.errors.BackpressureError(agent_id, queue_type, current_depth, threshold, policy='defer')[ソース]

ベースクラス: SimulationError

Error raised when backpressure limits are exceeded.

This error occurs when queue depths or processing rates exceed configured thresholds, triggering backpressure policies.

Requirements addressed: - 10.2: Backpressure policies (defer, shed oldest, drop newest) - 10.5: Queue depth monitoring and backpressure triggers

パラメータ:
  • agent_id (str)

  • queue_type (str)

  • current_depth (int)

  • threshold (int)

  • policy (str)

exception gunn.utils.errors.ValidationError(intent, validation_failures)[ソース]

ベースクラス: SimulationError

Error raised when intent validation fails.

This error occurs when an intent fails validation checks before being converted to an effect.

Requirements addressed: - 1.5: Intent validation through EffectValidator - 3.4: Validation for quota limits, cooldowns, and permissions

パラメータ:
exception gunn.utils.errors.CircuitBreakerOpenError(component, failure_count, threshold)[ソース]

ベースクラス: SimulationError

Error raised when circuit breaker is open.

This error occurs when a circuit breaker is in the open state, preventing operations from being executed to avoid cascading failures.

Requirements addressed: - 10.5: Circuit breaker patterns per agent or adapter

パラメータ:
  • component (str)

  • failure_count (int)

  • threshold (int)

class gunn.utils.errors.CircuitBreaker(failure_threshold=5, recovery_timeout=30.0, half_open_max_calls=3)[ソース]

ベースクラス: object

Circuit breaker for fault tolerance with failure thresholds.

Implements the circuit breaker pattern to prevent cascading failures by temporarily blocking operations when failure rates exceed thresholds.

Requirements addressed: - 10.5: Circuit breaker patterns per agent or adapter

パラメータ:
  • failure_threshold (int)

  • recovery_timeout (float)

  • half_open_max_calls (int)

property is_open: bool

Check if circuit breaker is open.

property is_half_open: bool

Check if circuit breaker is half-open.

property is_closed: bool

Check if circuit breaker is closed.

call(func, *args, **kwargs)[ソース]

Execute function with circuit breaker protection.

戻り値の型:

Any

パラメータ:
Args:

func: Function to execute *args: Function arguments **kwargs: Function keyword arguments

Returns:

Function result

Raises:

CircuitBreakerOpenError: If circuit breaker is open

async async_call(func, *args, **kwargs)[ソース]

Execute async function with circuit breaker protection.

戻り値の型:

Any

パラメータ:
Args:

func: Async function to execute *args: Function arguments **kwargs: Function keyword arguments

Returns:

Function result

Raises:

CircuitBreakerOpenError: If circuit breaker is open

class gunn.utils.errors.ErrorRecoveryPolicy(max_retries=3, retry_delay_ms=100.0, backoff_multiplier=2.0, max_delay_ms=5000.0)[ソース]

ベースクラス: object

Policy for handling different types of errors with recovery strategies.

Provides configurable recovery strategies for different error types to enable graceful degradation and automatic recovery.

Requirements addressed: - 10.4: Partial failures remain consistent with event log - 10.5: Circuit breaker patterns and error recovery

パラメータ:
  • max_retries (int)

  • retry_delay_ms (float)

  • backoff_multiplier (float)

  • max_delay_ms (float)

handle_stale_context(error)[ソース]

Handle stale context error.

戻り値の型:

RecoveryAction

パラメータ:

error (StaleContextError)

Args:

error: The stale context error

Returns:

Recovery action (typically REGENERATE)

handle_intent_conflict(error)[ソース]

Handle intent conflict error.

戻り値の型:

RecoveryAction

パラメータ:

error (IntentConflictError)

Args:

error: The intent conflict error

Returns:

Recovery action (typically RETRY_WITH_DELAY)

handle_quota_exceeded(error)[ソース]

Handle quota exceeded error.

戻り値の型:

RecoveryAction

パラメータ:

error (QuotaExceededError)

Args:

error: The quota exceeded error

Returns:

Recovery action (typically DEFER)

handle_timeout(error)[ソース]

Handle timeout error.

戻り値の型:

RecoveryAction

パラメータ:

error (TimeoutError)

Args:

error: The timeout error

Returns:

Recovery action (typically ABORT)

handle_backpressure(error)[ソース]

Handle backpressure error.

戻り値の型:

RecoveryAction

パラメータ:

error (BackpressureError)

Args:

error: The backpressure error

Returns:

Recovery action based on backpressure policy

handle_validation(error)[ソース]

Handle validation error.

戻り値の型:

RecoveryAction

パラメータ:

error (ValidationError)

Args:

error: The validation error

Returns:

Recovery action (typically ABORT)

handle_circuit_breaker(error)[ソース]

Handle circuit breaker open error.

戻り値の型:

RecoveryAction

パラメータ:

error (CircuitBreakerOpenError)

Args:

error: The circuit breaker error

Returns:

Recovery action (typically RETRY_WITH_DELAY)

calculate_retry_delay(attempt)[ソース]

Calculate retry delay with exponential backoff.

戻り値の型:

float

パラメータ:

attempt (int)

Args:

attempt: Retry attempt number (0-based)

Returns:

Delay in milliseconds

should_retry(error, attempt)[ソース]

Determine if error should be retried.

戻り値の型:

bool

パラメータ:
Args:

error: The error that occurred attempt: Current attempt number (0-based)

Returns:

True if should retry, False otherwise

exception gunn.utils.errors.LLMGenerationError(message, provider='unknown', model='unknown')[ソース]

ベースクラス: SimulationError

Error raised when LLM generation fails.

This error occurs when LLM token generation encounters an unrecoverable error during streaming.

Requirements addressed: - 6.1: Proper error handling for generation failures - 10.3: Structured error codes for LLM failures

パラメータ:
exception gunn.utils.errors.LLMTimeoutError(message, timeout_seconds, provider='unknown')[ソース]

ベースクラス: SimulationError

Error raised when LLM generation times out.

This error occurs when LLM generation exceeds the configured timeout duration.

Requirements addressed: - 6.3: Timeout handling for LLM generation - 10.3: Structured error codes for timeout scenarios

パラメータ:

gunn.utils.hashing module

Hash chain utilities for log integrity.

This module provides utilities for maintaining integrity of event logs through hash chaining and canonical JSON serialization.

gunn.utils.hashing.canonical_json(obj)[ソース]

Generate canonical JSON serialization for consistent hashing.

Uses orjson with sorted keys to ensure deterministic output regardless of dictionary key ordering.

戻り値の型:

bytes

パラメータ:

obj (dict[str, Any])

Args:

obj: Dictionary to serialize

Returns:

Canonical JSON bytes representation

Example:
>>> data = {"b": 2, "a": 1}
>>> canonical_json(data)
b'{"a":1,"b":2}'
gunn.utils.hashing.chain_checksum(effect, prev_checksum=None)[ソース]

Generate hash chain checksum using SHA-256.

Creates a hash chain by combining the previous checksum with the canonical JSON representation of the current effect.

戻り値の型:

str

パラメータ:
Args:

effect: Effect dictionary to hash prev_checksum: Previous checksum in the chain (None for first entry)

Returns:

Hexadecimal SHA-256 hash string

Example:
>>> effect = {"kind": "test", "payload": {}}
>>> chain_checksum(effect)
'a1b2c3...'
>>> chain_checksum(effect, "prev_hash")
'd4e5f6...'
gunn.utils.hashing.validate_hash_chain(entries)[ソース]

Validate integrity of a hash chain sequence.

Verifies that each entry's checksum correctly follows from the previous entry's checksum and the entry's effect data.

戻り値の型:

bool

パラメータ:

entries (list[dict[str, Any]])

Args:

entries: List of log entries, each containing 'effect' and 'checksum' fields

Returns:

True if chain is valid, False if corruption detected

Example:
>>> entries = [
...     {"effect": {"kind": "test1"}, "checksum": "abc123"},
...     {"effect": {"kind": "test2"}, "checksum": "def456"}
... ]
>>> validate_hash_chain(entries)
True
gunn.utils.hashing.detect_corruption(entries)[ソース]

Detect corrupted entries in a hash chain.

Returns indices of entries that have invalid checksums, indicating potential corruption or tampering.

戻り値の型:

list[int]

パラメータ:

entries (list[dict[str, Any]])

Args:

entries: List of log entries with 'effect' and 'checksum' fields

Returns:

List of indices where corruption was detected

Example:
>>> entries = [
...     {"effect": {"kind": "test1"}, "checksum": "valid_hash"},
...     {"effect": {"kind": "test2"}, "checksum": "invalid_hash"}
... ]
>>> detect_corruption(entries)
[1]
gunn.utils.hashing.verify_sequence_integrity(entries)[ソース]

Comprehensive integrity verification of a log sequence.

Performs multiple integrity checks including hash chain validation, sequence gap detection, and corruption analysis.

戻り値の型:

dict[str, Any]

パラメータ:

entries (list[dict[str, Any]])

Args:

entries: List of log entries with required fields

Returns:

Dictionary containing integrity report with: - valid: Overall validity boolean - corrupted_entries: List of corrupted entry indices - missing_sequences: List of detected sequence gaps - total_entries: Total number of entries checked

Example:
>>> entries = [{"effect": {...}, "checksum": "...", "global_seq": 1}]
>>> verify_sequence_integrity(entries)
{"valid": True, "corrupted_entries": [], "missing_sequences": [], "total_entries": 1}

gunn.utils.hashing_demo module

gunn.utils.memory module

Memory management and compaction utilities for the simulation core.

This module provides memory management capabilities including WorldState snapshots, log compaction, view cache eviction with LRU policy, and memory usage tracking.

class gunn.utils.memory.MemoryStats(**data)[ソース]

ベースクラス: BaseModel

Memory usage statistics.

パラメータ:
  • total_log_entries (int)

  • total_snapshots (int)

  • view_cache_size (int)

  • estimated_memory_mb (float)

  • oldest_entry_age_seconds (float)

  • newest_entry_age_seconds (float)

  • compaction_eligible_entries (int)

total_log_entries: int
total_snapshots: int
view_cache_size: int
estimated_memory_mb: float
oldest_entry_age_seconds: float
newest_entry_age_seconds: float
compaction_eligible_entries: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class gunn.utils.memory.MemoryConfig(**data)[ソース]

ベースクラス: BaseModel

Configuration for memory management.

パラメータ:
  • max_log_entries (int)

  • view_cache_size (int)

  • compaction_threshold (int)

  • snapshot_interval (int)

  • max_snapshots (int)

  • memory_check_interval_seconds (float)

  • auto_compaction_enabled (bool)

max_log_entries: int
view_cache_size: int
compaction_threshold: int
snapshot_interval: int
max_snapshots: int
memory_check_interval_seconds: float
auto_compaction_enabled: bool
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class gunn.utils.memory.WorldStateSnapshot(**data)[ソース]

ベースクラス: BaseModel

Snapshot of world state at a specific point in time.

パラメータ:
global_seq: int
sim_time: float
wall_time: float
world_state: WorldState
checksum: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class gunn.utils.memory.ViewCache(max_size=1000)[ソース]

ベースクラス: object

LRU cache for agent views with configurable size limits.

パラメータ:

max_size (int)

get(key)[ソース]

Get cached view and mark as recently used.

戻り値の型:

Any

パラメータ:

key (str)

Args:

key: Cache key (typically agent_id:view_seq)

Returns:

Cached view or None if not found

put(key, value)[ソース]

Store view in cache with LRU eviction.

戻り値の型:

None

パラメータ:
Args:

key: Cache key value: View to cache

evict_old_entries(max_age_seconds=3600.0)[ソース]

Evict entries older than specified age.

戻り値の型:

int

パラメータ:

max_age_seconds (float)

Args:

max_age_seconds: Maximum age for cached entries

Returns:

Number of entries evicted

clear()[ソース]

Clear all cached entries.

戻り値の型:

None

get_stats()[ソース]

Get cache statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with cache statistics

class gunn.utils.memory.SnapshotManager(config)[ソース]

ベースクラス: object

Manages WorldState snapshots for faster replay.

パラメータ:

config (MemoryConfig)

async create_snapshot(global_seq, world_state, sim_time=0.0)[ソース]

Create a new WorldState snapshot.

戻り値の型:

WorldStateSnapshot

パラメータ:
Args:

global_seq: Global sequence number for this snapshot world_state: Current world state to snapshot sim_time: Simulation time

Returns:

Created snapshot

find_nearest_snapshot(target_seq)[ソース]

Find the snapshot closest to but not exceeding target sequence.

戻り値の型:

WorldStateSnapshot | None

パラメータ:

target_seq (int)

Args:

target_seq: Target global sequence number

Returns:

Nearest snapshot or None if no suitable snapshot exists

get_all_snapshots()[ソース]

Get all snapshots ordered by global_seq.

戻り値の型:

list[WorldStateSnapshot]

Returns:

List of all snapshots

async validate_snapshot_integrity(snapshot)[ソース]

Validate snapshot integrity using checksum.

戻り値の型:

bool

パラメータ:

snapshot (WorldStateSnapshot)

Args:

snapshot: Snapshot to validate

Returns:

True if snapshot is valid

get_stats()[ソース]

Get snapshot statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with snapshot statistics

class gunn.utils.memory.MemoryManager(config)[ソース]

ベースクラス: object

Central memory management for the simulation core.

Handles WorldState snapshots, log compaction, view cache eviction, and memory usage monitoring with configurable limits.

Requirements addressed: - 7.3: WorldState snapshot creation every N events for faster replay - 11.4: Memory limits and compaction correctness

パラメータ:

config (MemoryConfig)

async check_and_create_snapshot(current_seq, world_state, sim_time=0.0)[ソース]

Check if a snapshot should be created and create it if needed.

戻り値の型:

WorldStateSnapshot | None

パラメータ:
Args:

current_seq: Current global sequence number world_state: Current world state sim_time: Current simulation time

Returns:

Created snapshot or None if no snapshot was needed

async compact_log(event_log)[ソース]

Compact old log entries while preserving replay capability.

Removes old entries from the log while ensuring that replay capability is preserved through snapshots. Only compacts if there are sufficient snapshots to maintain replay integrity.

戻り値の型:

int

パラメータ:

event_log (EventLog)

Args:

event_log: Event log to compact

Returns:

Number of entries removed

evict_old_views(max_age_seconds=3600.0)[ソース]

Remove old cached views to free memory.

戻り値の型:

int

パラメータ:

max_age_seconds (float)

Args:

max_age_seconds: Maximum age for cached views

Returns:

Number of views evicted

async check_memory_limits(event_log)[ソース]

Check if memory limits are exceeded and trigger compaction if needed.

戻り値の型:

bool

パラメータ:

event_log (EventLog)

Args:

event_log: Event log to check

Returns:

True if memory is within limits

estimate_memory_usage(event_log)[ソース]

Return current memory usage statistics.

戻り値の型:

MemoryStats

パラメータ:

event_log (EventLog)

Args:

event_log: Event log to analyze

Returns:

Memory usage statistics

get_detailed_stats(event_log)[ソース]

Get comprehensive memory management statistics.

戻り値の型:

dict[str, Any]

パラメータ:

event_log (EventLog)

Args:

event_log: Event log to analyze

Returns:

Detailed statistics dictionary

async cleanup()[ソース]

Cleanup memory manager resources.

戻り値の型:

None

gunn.utils.metrics_exporter module

Metrics exporter with feature flag status.

This module provides functionality to export feature flag status and other configuration metrics for operational visibility.

class gunn.utils.metrics_exporter.MetricsExporter(config)[ソース]

ベースクラス: object

Exports configuration and feature flag metrics.

パラメータ:

config (Config)

export_feature_flags(features)[ソース]

Export feature flag status as metrics.

戻り値の型:

None

パラメータ:

features (FeatureFlags)

Args:

features: Feature flags to export

export_config_info(config)[ソース]

Export configuration information as metrics.

戻り値の型:

None

パラメータ:

config (Config)

Args:

config: Configuration to export

export_system_info()[ソース]

Export system information as metrics.

戻り値の型:

None

record_startup_time(startup_duration)[ソース]

Record service startup time.

戻り値の型:

None

パラメータ:

startup_duration (float)

Args:

startup_duration: Time taken to start the service in seconds

record_config_reload()[ソース]

Record a configuration reload event.

戻り値の型:

None

record_config_validation_time(validation_duration)[ソース]

Record configuration validation time.

戻り値の型:

None

パラメータ:

validation_duration (float)

Args:

validation_duration: Time taken to validate config in seconds

should_export()[ソース]

Check if metrics should be exported based on interval.

戻り値の型:

bool

Returns:

True if enough time has passed since last export

export_all()[ソース]

Export all metrics if interval has passed.

戻り値の型:

None

gunn.utils.metrics_exporter.create_metrics_exporter(config)[ソース]

Create and initialize metrics exporter.

戻り値の型:

MetricsExporter

パラメータ:

config (Config)

Args:

config: Configuration to use for metrics export

Returns:

Configured metrics exporter

gunn.utils.metrics_exporter.get_metrics_text()[ソース]

Get metrics in Prometheus text format.

戻り値の型:

str

Returns:

Metrics as text string

gunn.utils.scheduling module

Scheduling utilities for fair intent processing.

This module provides scheduling algorithms for fair processing of intents across multiple agents, including weighted round robin for fairness.

class gunn.utils.scheduling.IntentScheduler(*args, **kwargs)[ソース]

ベースクラス: Protocol

Protocol for intent scheduling algorithms.

enqueue(intent, priority=0)[ソース]

Enqueue an intent for processing.

戻り値の型:

None

パラメータ:
Args:

intent: Intent to enqueue priority: Priority level (higher = more important)

dequeue()[ソース]

Dequeue the next intent for processing.

戻り値の型:

Intent | None

Returns:

Next intent to process, or None if queue is empty

get_queue_depth(agent_id=None)[ソース]

Get queue depth for specific agent or total.

戻り値の型:

int

パラメータ:

agent_id (str | None)

Args:

agent_id: Agent ID to check, or None for total depth

Returns:

Queue depth

get_stats()[ソース]

Get scheduler statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with scheduler statistics

class gunn.utils.scheduling.WeightedRoundRobinScheduler(default_weight=1, max_queue_depth=100, priority_levels=3)[ソース]

ベースクラス: object

Weighted Round Robin scheduler for fair intent processing.

Provides fair scheduling across agents with configurable weights and priority levels. Ensures no agent can monopolize processing while respecting priority and fairness constraints.

Requirements addressed: - 3.4: Priority and fairness policies - 9: Fairness (Weighted Round Robin) in intent validation pipeline

パラメータ:
  • default_weight (int)

  • max_queue_depth (int)

  • priority_levels (int)

set_agent_weight(agent_id, weight)[ソース]

Set weight for specific agent.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier weight: Weight for this agent (higher = more processing time)

Raises:

ValueError: If weight is not positive

enqueue(intent, priority=0)[ソース]

Enqueue an intent for processing.

戻り値の型:

None

パラメータ:
Args:

intent: Intent to enqueue priority: Priority level (0 = highest priority)

Raises:

ValueError: If priority is invalid or queue is full

dequeue()[ソース]

Dequeue the next intent using weighted round robin.

戻り値の型:

Intent | None

Returns:

Next intent to process, or None if all queues are empty

get_queue_depth(agent_id=None)[ソース]

Get queue depth for specific agent or total.

戻り値の型:

int

パラメータ:

agent_id (str | None)

Args:

agent_id: Agent ID to check, or None for total depth

Returns:

Queue depth

get_agent_queue_depths()[ソース]

Get queue depths for all agents.

戻り値の型:

dict[str, int]

Returns:

Dictionary mapping agent_id to queue depth

get_priority_distribution(agent_id)[ソース]

Get distribution of intents by priority for an agent.

戻り値の型:

dict[int, int]

パラメータ:

agent_id (str)

Args:

agent_id: Agent identifier

Returns:

Dictionary mapping priority to count

get_stats()[ソース]

Get scheduler statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with comprehensive scheduler statistics

remove_agent(agent_id)[ソース]

Remove agent and return number of dropped intents.

戻り値の型:

int

パラメータ:

agent_id (str)

Args:

agent_id: Agent to remove

Returns:

Number of intents that were dropped

clear()[ソース]

Clear all queues and return number of dropped intents.

戻り値の型:

int

Returns:

Total number of intents that were dropped

class gunn.utils.scheduling.PriorityScheduler(priority_levels=3, max_queue_depth=1000)[ソース]

ベースクラス: object

Simple priority-based scheduler without fairness.

Processes intents strictly by priority level, which can lead to starvation of lower-priority intents but ensures highest priority intents are always processed first.

パラメータ:
  • priority_levels (int)

  • max_queue_depth (int)

enqueue(intent, priority=0)[ソース]

Enqueue intent by priority.

戻り値の型:

None

パラメータ:
Args:

intent: Intent to enqueue priority: Priority level (0 = highest)

Raises:

ValueError: If priority is invalid or queue is full

dequeue()[ソース]

Dequeue highest priority intent.

戻り値の型:

Intent | None

Returns:

Next intent to process, or None if all queues are empty

get_queue_depth(agent_id=None)[ソース]

Get total queue depth.

戻り値の型:

int

パラメータ:

agent_id (str | None)

Args:

agent_id: Ignored for priority scheduler

Returns:

Total queue depth

get_stats()[ソース]

Get scheduler statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with scheduler statistics

gunn.utils.telemetry module

Telemetry utilities for logging, metrics, and tracing.

This module provides centralized observability infrastructure including: - Structured logging with PII redaction - Prometheus metrics collection - OpenTelemetry tracing setup - Performance measurement utilities - Memory usage tracking and reporting - Bandwidth/CPU measurement for patch operations

gunn.utils.telemetry.redact_pii(text)[ソース]

Redact personally identifiable information from text.

戻り値の型:

Any

パラメータ:

text (Any)

Args:

text: Input text that may contain PII

Returns:

Text with PII patterns replaced with [REDACTED_<type>], or original input if not a string

Example:
>>> redact_pii("Contact john@example.com or call 555-123-4567")
'Contact [REDACTED_EMAIL] or call [REDACTED_PHONE]'
gunn.utils.telemetry.pii_redaction_processor(logger, method_name, event_dict)[ソース]

Structlog processor to redact PII from log events.

戻り値の型:

dict[str, Any]

パラメータ:
Args:

logger: Logger instance (unused) method_name: Log method name (unused) event_dict: Event dictionary to process

Returns:

Event dictionary with PII redacted from string values

gunn.utils.telemetry.setup_logging(log_level='INFO', enable_pii_redaction=True)[ソース]

Initialize structured logging with PII redaction.

戻り値の型:

None

パラメータ:
  • log_level (str)

  • enable_pii_redaction (bool)

Args:

log_level: Logging level (DEBUG, INFO, WARNING, ERROR) enable_pii_redaction: Whether to enable PII redaction processor

gunn.utils.telemetry.setup_tracing(service_name='gunn', otlp_endpoint=None, enable_fastapi_instrumentation=True)[ソース]

Initialize OpenTelemetry tracing.

戻り値の型:

None

パラメータ:
  • service_name (str)

  • otlp_endpoint (str | None)

  • enable_fastapi_instrumentation (bool)

Args:

service_name: Name of the service for tracing otlp_endpoint: OTLP endpoint URL (if None, uses console exporter) enable_fastapi_instrumentation: Whether to enable FastAPI auto-instrumentation

gunn.utils.telemetry.get_tracer(name)[ソース]

Get OpenTelemetry tracer for a component.

戻り値の型:

Tracer

パラメータ:

name (str)

Args:

name: Tracer name (typically module name)

Returns:

Tracer instance

gunn.utils.telemetry.get_logger(name, **context)[ソース]

Get a structured logger with optional context.

戻り値の型:

Any

パラメータ:
Args:

name: Logger name **context: Additional context to bind to logger

Returns:

Bound logger with context

gunn.utils.telemetry.log_operation(logger, operation, status='success', global_seq=None, view_seq=None, agent_id=None, req_id=None, latency_ms=None, **extra_context)[ソース]

Log an operation with standardized fields for observability.

戻り値の型:

None

パラメータ:
  • logger (Any)

  • operation (str)

  • status (str)

  • global_seq (int | None)

  • view_seq (int | None)

  • agent_id (str | None)

  • req_id (str | None)

  • latency_ms (float | None)

  • extra_context (Any)

Args:

logger: Structured logger instance operation: Operation name status: Operation status (success, error, warning) global_seq: Global sequence number view_seq: View sequence number agent_id: Agent identifier req_id: Request identifier latency_ms: Operation latency in milliseconds **extra_context: Additional context fields

class gunn.utils.telemetry.PerformanceTimer(operation, agent_id=None, req_id=None, global_seq=None, view_seq=None, logger=None, record_metrics=True, create_span=True, tracer_name='gunn.performance')[ソース]

ベースクラス: object

Context manager for measuring operation performance.

Automatically records metrics, logs timing information, and creates tracing spans.

パラメータ:
  • operation (str)

  • agent_id (str | None)

  • req_id (str | None)

  • global_seq (int | None)

  • view_seq (int | None)

  • logger (BoundLogger | None)

  • record_metrics (bool)

  • create_span (bool)

  • tracer_name (str)

property duration: float | None

Get operation duration in seconds.

gunn.utils.telemetry.async_performance_timer(operation, agent_id=None, req_id=None, global_seq=None, view_seq=None, logger=None, record_metrics=True, create_span=True, tracer_name='gunn.performance')[ソース]

Async context manager for measuring operation performance.

戻り値の型:

AsyncGenerator[PerformanceTimer, None]

パラメータ:
  • operation (str)

  • agent_id (str | None)

  • req_id (str | None)

  • global_seq (int | None)

  • view_seq (int | None)

  • logger (BoundLogger | None)

  • record_metrics (bool)

  • create_span (bool)

  • tracer_name (str)

Args:

operation: Operation name for metrics/logging agent_id: Agent identifier (optional) req_id: Request identifier (optional) global_seq: Global sequence number (optional) view_seq: View sequence number (optional) logger: Logger instance (optional) record_metrics: Whether to record Prometheus metrics create_span: Whether to create tracing span tracer_name: Tracer name for spans

Yields:

PerformanceTimer instance

gunn.utils.telemetry.record_queue_depth(agent_id, depth)[ソース]

Record queue depth metric for an agent.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier depth: Current queue depth

gunn.utils.telemetry.record_cancellation(agent_id, reason)[ソース]

Record a cancellation event.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier reason: Cancellation reason

gunn.utils.telemetry.record_queue_high_watermark(agent_id, queue_type, depth)[ソース]

Record queue depth high watermark for backpressure monitoring.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • queue_type (str)

  • depth (int)

Args:

agent_id: Agent identifier queue_type: Type of queue (agent_queue, system_queue, etc.) depth: Queue depth that triggered high watermark

gunn.utils.telemetry.record_backpressure_event(agent_id, queue_type, policy)[ソース]

Record a backpressure event.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • queue_type (str)

  • policy (str)

Args:

agent_id: Agent identifier queue_type: Type of queue that triggered backpressure policy: Backpressure policy applied (defer, shed_oldest, drop_newest)

gunn.utils.telemetry.record_circuit_breaker_state_change(component, from_state, to_state)[ソース]

Record circuit breaker state change.

戻り値の型:

None

パラメータ:
  • component (str)

  • from_state (str)

  • to_state (str)

Args:

component: Component name with circuit breaker from_state: Previous state (CLOSED, OPEN, HALF_OPEN) to_state: New state (CLOSED, OPEN, HALF_OPEN)

gunn.utils.telemetry.record_error_recovery_action(error_type, recovery_action, agent_id='unknown')[ソース]

Record an error recovery action.

戻り値の型:

None

パラメータ:
  • error_type (str)

  • recovery_action (str)

  • agent_id (str)

Args:

error_type: Type of error that occurred recovery_action: Recovery action taken agent_id: Agent identifier (optional)

gunn.utils.telemetry.record_intent_throughput(agent_id, intent_kind, status='success')[ソース]

Record intent processing throughput.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • intent_kind (str)

  • status (str)

Args:

agent_id: Agent identifier intent_kind: Type of intent (Speak, Move, etc.) status: Processing status (success, error, conflict)

gunn.utils.telemetry.record_conflict(agent_id, conflict_type)[ソース]

Record an intent conflict.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • conflict_type (str)

Args:

agent_id: Agent identifier conflict_type: Type of conflict (staleness, validation, quota)

gunn.utils.telemetry.record_observation_delivery_latency(agent_id, latency_seconds)[ソース]

Record observation delivery latency.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • latency_seconds (float)

Args:

agent_id: Agent identifier latency_seconds: Time from effect creation to observation delivery

gunn.utils.telemetry.update_global_seq(seq)[ソース]

Update the current global sequence number metric.

戻り値の型:

None

パラメータ:

seq (int)

Args:

seq: Current global sequence number

gunn.utils.telemetry.update_view_seq(agent_id, seq)[ソース]

Update the current view sequence number for an agent.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier seq: Current view sequence number

gunn.utils.telemetry.update_active_agents_count(count)[ソース]

Update the number of active agents.

戻り値の型:

None

パラメータ:

count (int)

Args:

count: Number of currently active agents

gunn.utils.telemetry.start_metrics_server(port=8000)[ソース]

Start Prometheus metrics HTTP server.

戻り値の型:

None

パラメータ:

port (int)

Args:

port: Port to serve metrics on

class gunn.utils.telemetry.MonotonicClock[ソース]

ベースクラス: object

Monotonic clock for internal timing measurements.

Uses asyncio event loop's monotonic time for consistent timing that's not affected by system clock adjustments.

static now()[ソース]

Get current monotonic time in seconds.

戻り値の型:

float

Returns:

Current time from asyncio event loop's monotonic clock

static wall_time()[ソース]

Get current wall clock time for display purposes.

戻り値の型:

float

Returns:

Current wall clock time in seconds since epoch

gunn.utils.telemetry.get_timing_context()[ソース]

Get current timing context for logging.

戻り値の型:

dict[str, float]

Returns:

Dictionary with monotonic_time and wall_time

class gunn.utils.telemetry.SystemMonitor[ソース]

ベースクラス: object

Monitor system resources and record metrics.

record_memory_usage(component='system')[ソース]

Record current memory usage and return bytes used.

戻り値の型:

float

パラメータ:

component (str)

Args:

component: Component name for metrics labeling

Returns:

Memory usage in bytes

record_cpu_usage(component='system')[ソース]

Record current CPU usage and return percentage.

戻り値の型:

float

パラメータ:

component (str)

Args:

component: Component name for metrics labeling

Returns:

CPU usage percentage

get_system_stats()[ソース]

Get comprehensive system statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with system resource information

class gunn.utils.telemetry.BandwidthMonitor[ソース]

ベースクラス: object

Monitor bandwidth usage for patch operations and other data transfers.

record_patch_bandwidth(agent_id, patch_size_bytes, operation_count, is_fallback=False)[ソース]

Record bandwidth usage for patch operations.

戻り値の型:

None

パラメータ:
  • agent_id (str)

  • patch_size_bytes (int)

  • operation_count (int)

  • is_fallback (bool)

Args:

agent_id: Agent identifier patch_size_bytes: Size of patch data in bytes operation_count: Number of patch operations is_fallback: Whether this was a fallback to full snapshot

record_data_transfer(direction, component, bytes_transferred)[ソース]

Record general data transfer.

戻り値の型:

None

パラメータ:
  • direction (str)

  • component (str)

  • bytes_transferred (int)

Args:

direction: Transfer direction (inbound/outbound) component: Component name bytes_transferred: Number of bytes transferred

gunn.utils.timing module

TimedQueue implementation for latency simulation.

This module provides a priority queue for timed delivery with latency simulation, supporting concurrent operations and precise timing control.

class gunn.utils.timing.TimedQueue[ソース]

ベースクラス: object

Priority queue for timed delivery with latency simulation.

Supports scheduling items for delivery at specific times with proper locking for put operations and lock-free sleep for get operations to avoid blocking.

Requirements addressed: - 6.4: ObservationDelta delivery latency (core in-proc) ≤ 20ms - 6.5: Timed delivery using per-agent TimedQueues with latency models - 4.7: Non-blocking operations per agent

async put_at(deliver_at, item)[ソース]

Schedule item for delivery at specific time.

戻り値の型:

None

パラメータ:
Args:
deliver_at: Absolute time (from asyncio.get_running_loop().time()) when item

should be delivered

item: Item to deliver

Raises:

RuntimeError: If queue is closed

async put_in(delay_seconds, item)[ソース]

Schedule item for delivery after specified delay.

戻り値の型:

None

パラメータ:
Args:

delay_seconds: Delay in seconds from now item: Item to deliver

async get()[ソース]

Get next item when its delivery time arrives.

Uses lock-free sleep to avoid blocking other operations.

戻り値の型:

Any

Returns:

The next item when its delivery time arrives

Raises:

RuntimeError: If queue is closed and empty

get_nowait()[ソース]

Get next item if it's ready for delivery now.

戻り値の型:

Any

Returns:

The next item if ready, otherwise raises asyncio.QueueEmpty

Raises:

asyncio.QueueEmpty: If no items are ready for delivery RuntimeError: If queue is closed and empty

qsize()[ソース]

Return the approximate size of the queue.

戻り値の型:

int

empty()[ソース]

Return True if the queue is empty.

戻り値の型:

bool

async close()[ソース]

Close the queue, preventing new items from being added.

Existing get() operations will continue until the queue is empty.

戻り値の型:

None

peek_next_delivery_time()[ソース]

Get the delivery time of the next item without removing it.

戻り値の型:

float | None

Returns:

Delivery time of next item, or None if queue is empty

gunn.utils.timing_demo module

Module contents

exception gunn.utils.BackpressureError(agent_id, queue_type, current_depth, threshold, policy='defer')[ソース]

ベースクラス: SimulationError

Error raised when backpressure limits are exceeded.

This error occurs when queue depths or processing rates exceed configured thresholds, triggering backpressure policies.

Requirements addressed: - 10.2: Backpressure policies (defer, shed oldest, drop newest) - 10.5: Queue depth monitoring and backpressure triggers

パラメータ:
  • agent_id (str)

  • queue_type (str)

  • current_depth (int)

  • threshold (int)

  • policy (str)

class gunn.utils.BackpressureManager[ソース]

ベースクラス: object

Manager for applying backpressure policies to queues.

Provides a unified interface for managing different backpressure policies and applying them consistently across the system.

Requirements addressed: - 10.2: Configurable backpressure policies per agent class - 10.5: Queue depth monitoring and backpressure triggers

property available_policies: list[str]

Get list of available policy names.

create_policy(policy_name, threshold, agent_id='unknown')[ソース]

Create a backpressure policy instance.

戻り値の型:

BackpressurePolicy[Any]

パラメータ:
  • policy_name (str)

  • threshold (int)

  • agent_id (str)

Args:

policy_name: Name of the policy (defer, shed_oldest, drop_newest) threshold: Queue depth threshold agent_id: Agent identifier for metrics

Returns:

BackpressurePolicy instance

Raises:

ValueError: If policy_name is not recognized

register_policy(name, policy_class)[ソース]

Register a custom backpressure policy.

戻り値の型:

None

パラメータ:
Args:

name: Policy name policy_class: Policy class implementing BackpressurePolicy

class gunn.utils.BackpressureQueue(policy, maxsize=0, queue_type='queue')[ソース]

ベースクラス: Generic

Queue with integrated backpressure policy support.

A queue implementation that automatically applies backpressure policies when depth thresholds are exceeded.

Requirements addressed: - 10.2: Backpressure policies integrated with queue operations - 10.5: Automatic backpressure triggers based on queue depth

パラメータ:
empty()[ソース]

Check if queue is empty.

戻り値の型:

bool

full()[ソース]

Check if queue is full (based on maxsize).

戻り値の型:

bool

async get()[ソース]

Get an item from the queue.

戻り値の型:

TypeVar(T)

Returns:

Next item from queue

Raises:

asyncio.QueueEmpty: If queue is empty

async put(item)[ソース]

Put an item in the queue with backpressure handling.

戻り値の型:

None

パラメータ:

item (T)

Args:

item: Item to add to queue

Raises:

BackpressureError: If policy defers the operation

qsize()[ソース]

Get current queue size.

戻り値の型:

int

class gunn.utils.CircuitBreaker(failure_threshold=5, recovery_timeout=30.0, half_open_max_calls=3)[ソース]

ベースクラス: object

Circuit breaker for fault tolerance with failure thresholds.

Implements the circuit breaker pattern to prevent cascading failures by temporarily blocking operations when failure rates exceed thresholds.

Requirements addressed: - 10.5: Circuit breaker patterns per agent or adapter

パラメータ:
  • failure_threshold (int)

  • recovery_timeout (float)

  • half_open_max_calls (int)

async async_call(func, *args, **kwargs)[ソース]

Execute async function with circuit breaker protection.

戻り値の型:

Any

パラメータ:
Args:

func: Async function to execute *args: Function arguments **kwargs: Function keyword arguments

Returns:

Function result

Raises:

CircuitBreakerOpenError: If circuit breaker is open

call(func, *args, **kwargs)[ソース]

Execute function with circuit breaker protection.

戻り値の型:

Any

パラメータ:
Args:

func: Function to execute *args: Function arguments **kwargs: Function keyword arguments

Returns:

Function result

Raises:

CircuitBreakerOpenError: If circuit breaker is open

property is_closed: bool

Check if circuit breaker is closed.

property is_half_open: bool

Check if circuit breaker is half-open.

property is_open: bool

Check if circuit breaker is open.

exception gunn.utils.CircuitBreakerOpenError(component, failure_count, threshold)[ソース]

ベースクラス: SimulationError

Error raised when circuit breaker is open.

This error occurs when a circuit breaker is in the open state, preventing operations from being executed to avoid cascading failures.

Requirements addressed: - 10.5: Circuit breaker patterns per agent or adapter

パラメータ:
  • component (str)

  • failure_count (int)

  • threshold (int)

class gunn.utils.DeferPolicy(threshold, agent_id='unknown')[ソース]

ベースクラス: BackpressurePolicy, Generic

Defer policy - block new items when threshold exceeded.

This is the default policy that raises BackpressureError to defer processing until queue depth decreases below threshold.

Requirements addressed: - 10.2: Backpressure policies with defer as default

パラメータ:
  • threshold (int)

  • agent_id (str)

async handle_overflow(queue, new_item)[ソース]

Handle overflow by deferring (raising BackpressureError).

戻り値の型:

bool

パラメータ:
  • queue (deque)

  • new_item (T)

Args:

queue: The overflowing queue new_item: New item to be deferred

Returns:

False (never handles the item, always defers)

Raises:

BackpressureError: Always raised to defer the operation

property policy_name: str

Get the policy name for metrics and logging.

class gunn.utils.DropNewestPolicy(threshold, agent_id='unknown')[ソース]

ベースクラス: BackpressurePolicy, Generic

Drop newest policy - drop new items when threshold exceeded.

When queue exceeds threshold, drops the new item instead of adding it, preserving existing queued items.

Requirements addressed: - 10.2: Backpressure policies with drop newest option

パラメータ:
  • threshold (int)

  • agent_id (str)

async handle_overflow(queue, new_item)[ソース]

Handle overflow by dropping the new item.

戻り値の型:

bool

パラメータ:
  • queue (deque)

  • new_item (T)

Args:

queue: The overflowing queue new_item: New item to potentially drop

Returns:

True (item was handled by dropping it)

property policy_name: str

Get the policy name for metrics and logging.

class gunn.utils.ErrorRecoveryPolicy(max_retries=3, retry_delay_ms=100.0, backoff_multiplier=2.0, max_delay_ms=5000.0)[ソース]

ベースクラス: object

Policy for handling different types of errors with recovery strategies.

Provides configurable recovery strategies for different error types to enable graceful degradation and automatic recovery.

Requirements addressed: - 10.4: Partial failures remain consistent with event log - 10.5: Circuit breaker patterns and error recovery

パラメータ:
  • max_retries (int)

  • retry_delay_ms (float)

  • backoff_multiplier (float)

  • max_delay_ms (float)

calculate_retry_delay(attempt)[ソース]

Calculate retry delay with exponential backoff.

戻り値の型:

float

パラメータ:

attempt (int)

Args:

attempt: Retry attempt number (0-based)

Returns:

Delay in milliseconds

handle_backpressure(error)[ソース]

Handle backpressure error.

戻り値の型:

RecoveryAction

パラメータ:

error (BackpressureError)

Args:

error: The backpressure error

Returns:

Recovery action based on backpressure policy

handle_circuit_breaker(error)[ソース]

Handle circuit breaker open error.

戻り値の型:

RecoveryAction

パラメータ:

error (CircuitBreakerOpenError)

Args:

error: The circuit breaker error

Returns:

Recovery action (typically RETRY_WITH_DELAY)

handle_intent_conflict(error)[ソース]

Handle intent conflict error.

戻り値の型:

RecoveryAction

パラメータ:

error (IntentConflictError)

Args:

error: The intent conflict error

Returns:

Recovery action (typically RETRY_WITH_DELAY)

handle_quota_exceeded(error)[ソース]

Handle quota exceeded error.

戻り値の型:

RecoveryAction

パラメータ:

error (QuotaExceededError)

Args:

error: The quota exceeded error

Returns:

Recovery action (typically DEFER)

handle_stale_context(error)[ソース]

Handle stale context error.

戻り値の型:

RecoveryAction

パラメータ:

error (StaleContextError)

Args:

error: The stale context error

Returns:

Recovery action (typically REGENERATE)

handle_timeout(error)[ソース]

Handle timeout error.

戻り値の型:

RecoveryAction

パラメータ:

error (TimeoutError)

Args:

error: The timeout error

Returns:

Recovery action (typically ABORT)

handle_validation(error)[ソース]

Handle validation error.

戻り値の型:

RecoveryAction

パラメータ:

error (ValidationError)

Args:

error: The validation error

Returns:

Recovery action (typically ABORT)

should_retry(error, attempt)[ソース]

Determine if error should be retried.

戻り値の型:

bool

パラメータ:
Args:

error: The error that occurred attempt: Current attempt number (0-based)

Returns:

True if should retry, False otherwise

exception gunn.utils.IntentConflictError(intent, conflicting_effects)[ソース]

ベースクラス: SimulationError

Error raised when intent conflicts with existing effects.

This error occurs when an intent cannot be executed due to conflicts with recently processed effects or other intents.

Requirements addressed: - 3.4: Intent conflict resolution - 10.3: Structured error codes including INTENT_CONFLICT

パラメータ:
class gunn.utils.MemoryConfig(**data)[ソース]

ベースクラス: BaseModel

Configuration for memory management.

パラメータ:
  • max_log_entries (int)

  • view_cache_size (int)

  • compaction_threshold (int)

  • snapshot_interval (int)

  • max_snapshots (int)

  • memory_check_interval_seconds (float)

  • auto_compaction_enabled (bool)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

max_log_entries: int
view_cache_size: int
compaction_threshold: int
snapshot_interval: int
max_snapshots: int
memory_check_interval_seconds: float
auto_compaction_enabled: bool
class gunn.utils.MemoryManager(config)[ソース]

ベースクラス: object

Central memory management for the simulation core.

Handles WorldState snapshots, log compaction, view cache eviction, and memory usage monitoring with configurable limits.

Requirements addressed: - 7.3: WorldState snapshot creation every N events for faster replay - 11.4: Memory limits and compaction correctness

パラメータ:

config (MemoryConfig)

async check_and_create_snapshot(current_seq, world_state, sim_time=0.0)[ソース]

Check if a snapshot should be created and create it if needed.

戻り値の型:

WorldStateSnapshot | None

パラメータ:
Args:

current_seq: Current global sequence number world_state: Current world state sim_time: Current simulation time

Returns:

Created snapshot or None if no snapshot was needed

async check_memory_limits(event_log)[ソース]

Check if memory limits are exceeded and trigger compaction if needed.

戻り値の型:

bool

パラメータ:

event_log (EventLog)

Args:

event_log: Event log to check

Returns:

True if memory is within limits

async cleanup()[ソース]

Cleanup memory manager resources.

戻り値の型:

None

async compact_log(event_log)[ソース]

Compact old log entries while preserving replay capability.

Removes old entries from the log while ensuring that replay capability is preserved through snapshots. Only compacts if there are sufficient snapshots to maintain replay integrity.

戻り値の型:

int

パラメータ:

event_log (EventLog)

Args:

event_log: Event log to compact

Returns:

Number of entries removed

estimate_memory_usage(event_log)[ソース]

Return current memory usage statistics.

戻り値の型:

MemoryStats

パラメータ:

event_log (EventLog)

Args:

event_log: Event log to analyze

Returns:

Memory usage statistics

evict_old_views(max_age_seconds=3600.0)[ソース]

Remove old cached views to free memory.

戻り値の型:

int

パラメータ:

max_age_seconds (float)

Args:

max_age_seconds: Maximum age for cached views

Returns:

Number of views evicted

get_detailed_stats(event_log)[ソース]

Get comprehensive memory management statistics.

戻り値の型:

dict[str, Any]

パラメータ:

event_log (EventLog)

Args:

event_log: Event log to analyze

Returns:

Detailed statistics dictionary

class gunn.utils.MemoryStats(**data)[ソース]

ベースクラス: BaseModel

Memory usage statistics.

パラメータ:
  • total_log_entries (int)

  • total_snapshots (int)

  • view_cache_size (int)

  • estimated_memory_mb (float)

  • oldest_entry_age_seconds (float)

  • newest_entry_age_seconds (float)

  • compaction_eligible_entries (int)

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

total_log_entries: int
total_snapshots: int
view_cache_size: int
estimated_memory_mb: float
oldest_entry_age_seconds: float
newest_entry_age_seconds: float
compaction_eligible_entries: int
class gunn.utils.PriorityScheduler(priority_levels=3, max_queue_depth=1000)[ソース]

ベースクラス: object

Simple priority-based scheduler without fairness.

Processes intents strictly by priority level, which can lead to starvation of lower-priority intents but ensures highest priority intents are always processed first.

パラメータ:
  • priority_levels (int)

  • max_queue_depth (int)

dequeue()[ソース]

Dequeue highest priority intent.

戻り値の型:

Intent | None

Returns:

Next intent to process, or None if all queues are empty

enqueue(intent, priority=0)[ソース]

Enqueue intent by priority.

戻り値の型:

None

パラメータ:
Args:

intent: Intent to enqueue priority: Priority level (0 = highest)

Raises:

ValueError: If priority is invalid or queue is full

get_queue_depth(agent_id=None)[ソース]

Get total queue depth.

戻り値の型:

int

パラメータ:

agent_id (str | None)

Args:

agent_id: Ignored for priority scheduler

Returns:

Total queue depth

get_stats()[ソース]

Get scheduler statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with scheduler statistics

exception gunn.utils.QuotaExceededError(agent_id, quota_type, limit, current=0)[ソース]

ベースクラス: SimulationError

Error raised when agent exceeds quota limits.

This error occurs when an agent attempts to perform an action that would exceed their configured quota limits.

Requirements addressed: - 1.5: Quota limits and permissions - 10.3: Structured error codes including QUOTA_EXCEEDED

パラメータ:
class gunn.utils.RecoveryAction(*values)[ソース]

ベースクラス: Enum

Recovery actions for error handling.

RETRY = 'retry'
ABORT = 'abort'
REGENERATE = 'regenerate'
RETRY_WITH_DELAY = 'retry_with_delay'
MODIFY_INTENT = 'modify_intent'
DEFER = 'defer'
SHED_OLDEST = 'shed_oldest'
DROP_NEWEST = 'drop_newest'
class gunn.utils.ShedOldestPolicy(threshold, agent_id='unknown')[ソース]

ベースクラス: BackpressurePolicy, Generic

Shed oldest policy - drop oldest items to make room for new ones.

When queue exceeds threshold, removes oldest items to make room for new items, maintaining queue size at threshold.

Requirements addressed: - 10.2: Backpressure policies with shed oldest option

パラメータ:
  • threshold (int)

  • agent_id (str)

async handle_overflow(queue, new_item)[ソース]

Handle overflow by dropping oldest items.

戻り値の型:

bool

パラメータ:
  • queue (deque)

  • new_item (T)

Args:

queue: The overflowing queue new_item: New item to add

Returns:

True (item was handled by adding and shedding old items)

property policy_name: str

Get the policy name for metrics and logging.

exception gunn.utils.SimulationError(message, recovery_action=RecoveryAction.ABORT)[ソース]

ベースクラス: Exception

Base exception for simulation errors.

パラメータ:
class gunn.utils.SnapshotManager(config)[ソース]

ベースクラス: object

Manages WorldState snapshots for faster replay.

パラメータ:

config (MemoryConfig)

async create_snapshot(global_seq, world_state, sim_time=0.0)[ソース]

Create a new WorldState snapshot.

戻り値の型:

WorldStateSnapshot

パラメータ:
Args:

global_seq: Global sequence number for this snapshot world_state: Current world state to snapshot sim_time: Simulation time

Returns:

Created snapshot

find_nearest_snapshot(target_seq)[ソース]

Find the snapshot closest to but not exceeding target sequence.

戻り値の型:

WorldStateSnapshot | None

パラメータ:

target_seq (int)

Args:

target_seq: Target global sequence number

Returns:

Nearest snapshot or None if no suitable snapshot exists

get_all_snapshots()[ソース]

Get all snapshots ordered by global_seq.

戻り値の型:

list[WorldStateSnapshot]

Returns:

List of all snapshots

get_stats()[ソース]

Get snapshot statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with snapshot statistics

async validate_snapshot_integrity(snapshot)[ソース]

Validate snapshot integrity using checksum.

戻り値の型:

bool

パラメータ:

snapshot (WorldStateSnapshot)

Args:

snapshot: Snapshot to validate

Returns:

True if snapshot is valid

exception gunn.utils.StaleContextError(req_id, expected_seq, actual_seq, threshold=0)[ソース]

ベースクラス: SimulationError

Error raised when intent context becomes stale.

This error occurs when an agent's context (view_seq) is outdated compared to the current world state, indicating the intent was based on stale information.

Requirements addressed: - 4.2: Staleness detection using latest_view_seq > context_seq + threshold - 10.3: Structured error codes including STALE_CONTEXT

パラメータ:
  • req_id (str)

  • expected_seq (int)

  • actual_seq (int)

  • threshold (int)

class gunn.utils.TimedQueue[ソース]

ベースクラス: object

Priority queue for timed delivery with latency simulation.

Supports scheduling items for delivery at specific times with proper locking for put operations and lock-free sleep for get operations to avoid blocking.

Requirements addressed: - 6.4: ObservationDelta delivery latency (core in-proc) ≤ 20ms - 6.5: Timed delivery using per-agent TimedQueues with latency models - 4.7: Non-blocking operations per agent

async close()[ソース]

Close the queue, preventing new items from being added.

Existing get() operations will continue until the queue is empty.

戻り値の型:

None

empty()[ソース]

Return True if the queue is empty.

戻り値の型:

bool

async get()[ソース]

Get next item when its delivery time arrives.

Uses lock-free sleep to avoid blocking other operations.

戻り値の型:

Any

Returns:

The next item when its delivery time arrives

Raises:

RuntimeError: If queue is closed and empty

get_nowait()[ソース]

Get next item if it's ready for delivery now.

戻り値の型:

Any

Returns:

The next item if ready, otherwise raises asyncio.QueueEmpty

Raises:

asyncio.QueueEmpty: If no items are ready for delivery RuntimeError: If queue is closed and empty

peek_next_delivery_time()[ソース]

Get the delivery time of the next item without removing it.

戻り値の型:

float | None

Returns:

Delivery time of next item, or None if queue is empty

async put_at(deliver_at, item)[ソース]

Schedule item for delivery at specific time.

戻り値の型:

None

パラメータ:
Args:
deliver_at: Absolute time (from asyncio.get_running_loop().time()) when item

should be delivered

item: Item to deliver

Raises:

RuntimeError: If queue is closed

async put_in(delay_seconds, item)[ソース]

Schedule item for delivery after specified delay.

戻り値の型:

None

パラメータ:
Args:

delay_seconds: Delay in seconds from now item: Item to deliver

qsize()[ソース]

Return the approximate size of the queue.

戻り値の型:

int

exception gunn.utils.TimeoutError(operation, deadline_ms, actual_ms)[ソース]

ベースクラス: SimulationError

Error raised when operations exceed deadline.

This error occurs when intent processing or generation takes longer than the configured deadline.

Requirements addressed: - 3.5: Deadline enforcement for intent processing - 10.3: Structured error codes including TIMEOUT

パラメータ:
exception gunn.utils.ValidationError(intent, validation_failures)[ソース]

ベースクラス: SimulationError

Error raised when intent validation fails.

This error occurs when an intent fails validation checks before being converted to an effect.

Requirements addressed: - 1.5: Intent validation through EffectValidator - 3.4: Validation for quota limits, cooldowns, and permissions

パラメータ:
class gunn.utils.ViewCache(max_size=1000)[ソース]

ベースクラス: object

LRU cache for agent views with configurable size limits.

パラメータ:

max_size (int)

clear()[ソース]

Clear all cached entries.

戻り値の型:

None

evict_old_entries(max_age_seconds=3600.0)[ソース]

Evict entries older than specified age.

戻り値の型:

int

パラメータ:

max_age_seconds (float)

Args:

max_age_seconds: Maximum age for cached entries

Returns:

Number of entries evicted

get(key)[ソース]

Get cached view and mark as recently used.

戻り値の型:

Any

パラメータ:

key (str)

Args:

key: Cache key (typically agent_id:view_seq)

Returns:

Cached view or None if not found

get_stats()[ソース]

Get cache statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with cache statistics

put(key, value)[ソース]

Store view in cache with LRU eviction.

戻り値の型:

None

パラメータ:
Args:

key: Cache key value: View to cache

class gunn.utils.WeightedRoundRobinScheduler(default_weight=1, max_queue_depth=100, priority_levels=3)[ソース]

ベースクラス: object

Weighted Round Robin scheduler for fair intent processing.

Provides fair scheduling across agents with configurable weights and priority levels. Ensures no agent can monopolize processing while respecting priority and fairness constraints.

Requirements addressed: - 3.4: Priority and fairness policies - 9: Fairness (Weighted Round Robin) in intent validation pipeline

パラメータ:
  • default_weight (int)

  • max_queue_depth (int)

  • priority_levels (int)

clear()[ソース]

Clear all queues and return number of dropped intents.

戻り値の型:

int

Returns:

Total number of intents that were dropped

dequeue()[ソース]

Dequeue the next intent using weighted round robin.

戻り値の型:

Intent | None

Returns:

Next intent to process, or None if all queues are empty

enqueue(intent, priority=0)[ソース]

Enqueue an intent for processing.

戻り値の型:

None

パラメータ:
Args:

intent: Intent to enqueue priority: Priority level (0 = highest priority)

Raises:

ValueError: If priority is invalid or queue is full

get_agent_queue_depths()[ソース]

Get queue depths for all agents.

戻り値の型:

dict[str, int]

Returns:

Dictionary mapping agent_id to queue depth

get_priority_distribution(agent_id)[ソース]

Get distribution of intents by priority for an agent.

戻り値の型:

dict[int, int]

パラメータ:

agent_id (str)

Args:

agent_id: Agent identifier

Returns:

Dictionary mapping priority to count

get_queue_depth(agent_id=None)[ソース]

Get queue depth for specific agent or total.

戻り値の型:

int

パラメータ:

agent_id (str | None)

Args:

agent_id: Agent ID to check, or None for total depth

Returns:

Queue depth

get_stats()[ソース]

Get scheduler statistics.

戻り値の型:

dict[str, Any]

Returns:

Dictionary with comprehensive scheduler statistics

remove_agent(agent_id)[ソース]

Remove agent and return number of dropped intents.

戻り値の型:

int

パラメータ:

agent_id (str)

Args:

agent_id: Agent to remove

Returns:

Number of intents that were dropped

set_agent_weight(agent_id, weight)[ソース]

Set weight for specific agent.

戻り値の型:

None

パラメータ:
Args:

agent_id: Agent identifier weight: Weight for this agent (higher = more processing time)

Raises:

ValueError: If weight is not positive

class gunn.utils.WorldStateSnapshot(**data)[ソース]

ベースクラス: BaseModel

Snapshot of world state at a specific point in time.

パラメータ:
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

global_seq: int
sim_time: float
wall_time: float
world_state: WorldState
checksum: str
gunn.utils.canonical_json(obj)[ソース]

Generate canonical JSON serialization for consistent hashing.

Uses orjson with sorted keys to ensure deterministic output regardless of dictionary key ordering.

戻り値の型:

bytes

パラメータ:

obj (dict[str, Any])

Args:

obj: Dictionary to serialize

Returns:

Canonical JSON bytes representation

Example:
>>> data = {"b": 2, "a": 1}
>>> canonical_json(data)
b'{"a":1,"b":2}'
gunn.utils.chain_checksum(effect, prev_checksum=None)[ソース]

Generate hash chain checksum using SHA-256.

Creates a hash chain by combining the previous checksum with the canonical JSON representation of the current effect.

戻り値の型:

str

パラメータ:
Args:

effect: Effect dictionary to hash prev_checksum: Previous checksum in the chain (None for first entry)

Returns:

Hexadecimal SHA-256 hash string

Example:
>>> effect = {"kind": "test", "payload": {}}
>>> chain_checksum(effect)
'a1b2c3...'
>>> chain_checksum(effect, "prev_hash")
'd4e5f6...'
gunn.utils.detect_corruption(entries)[ソース]

Detect corrupted entries in a hash chain.

Returns indices of entries that have invalid checksums, indicating potential corruption or tampering.

戻り値の型:

list[int]

パラメータ:

entries (list[dict[str, Any]])

Args:

entries: List of log entries with 'effect' and 'checksum' fields

Returns:

List of indices where corruption was detected

Example:
>>> entries = [
...     {"effect": {"kind": "test1"}, "checksum": "valid_hash"},
...     {"effect": {"kind": "test2"}, "checksum": "invalid_hash"}
... ]
>>> detect_corruption(entries)
[1]
gunn.utils.validate_hash_chain(entries)[ソース]

Validate integrity of a hash chain sequence.

Verifies that each entry's checksum correctly follows from the previous entry's checksum and the entry's effect data.

戻り値の型:

bool

パラメータ:

entries (list[dict[str, Any]])

Args:

entries: List of log entries, each containing 'effect' and 'checksum' fields

Returns:

True if chain is valid, False if corruption detected

Example:
>>> entries = [
...     {"effect": {"kind": "test1"}, "checksum": "abc123"},
...     {"effect": {"kind": "test2"}, "checksum": "def456"}
... ]
>>> validate_hash_chain(entries)
True
gunn.utils.verify_sequence_integrity(entries)[ソース]

Comprehensive integrity verification of a log sequence.

Performs multiple integrity checks including hash chain validation, sequence gap detection, and corruption analysis.

戻り値の型:

dict[str, Any]

パラメータ:

entries (list[dict[str, Any]])

Args:

entries: List of log entries with required fields

Returns:

Dictionary containing integrity report with: - valid: Overall validity boolean - corrupted_entries: List of corrupted entry indices - missing_sequences: List of detected sequence gaps - total_entries: Total number of entries checked

Example:
>>> entries = [{"effect": {...}, "checksum": "...", "global_seq": 1}]
>>> verify_sequence_integrity(entries)
{"valid": True, "corrupted_entries": [], "missing_sequences": [], "total_entries": 1}