gunn.utils package¶
Submodules¶
gunn.utils.backpressure module¶
Backpressure policies for queue management.
This module implements different backpressure policies to handle queue overflow and resource exhaustion scenarios with configurable strategies.
- class gunn.utils.backpressure.BackpressurePolicy(threshold, agent_id='unknown')[ソース]¶
-
Abstract base class for backpressure policies.
Defines the interface for handling queue overflow scenarios with different strategies for managing resource exhaustion.
Requirements addressed: - 10.2: Backpressure policies (defer, shed oldest, drop newest) - 10.5: Queue depth monitoring and backpressure triggers
- abstractmethod async handle_overflow(queue, new_item)[ソース]¶
Handle queue overflow when threshold is exceeded.
- Args:
queue: The queue that is overflowing new_item: New item trying to be added
- Returns:
True if item was handled (added or dropped), False if should defer
- Raises:
BackpressureError: If backpressure should be applied
- class gunn.utils.backpressure.DeferPolicy(threshold, agent_id='unknown')[ソース]¶
ベースクラス:
BackpressurePolicy
,Generic
Defer policy - block new items when threshold exceeded.
This is the default policy that raises BackpressureError to defer processing until queue depth decreases below threshold.
Requirements addressed: - 10.2: Backpressure policies with defer as default
- class gunn.utils.backpressure.ShedOldestPolicy(threshold, agent_id='unknown')[ソース]¶
ベースクラス:
BackpressurePolicy
,Generic
Shed oldest policy - drop oldest items to make room for new ones.
When queue exceeds threshold, removes oldest items to make room for new items, maintaining queue size at threshold.
Requirements addressed: - 10.2: Backpressure policies with shed oldest option
- class gunn.utils.backpressure.DropNewestPolicy(threshold, agent_id='unknown')[ソース]¶
ベースクラス:
BackpressurePolicy
,Generic
Drop newest policy - drop new items when threshold exceeded.
When queue exceeds threshold, drops the new item instead of adding it, preserving existing queued items.
Requirements addressed: - 10.2: Backpressure policies with drop newest option
- class gunn.utils.backpressure.BackpressureManager[ソース]¶
ベースクラス:
object
Manager for applying backpressure policies to queues.
Provides a unified interface for managing different backpressure policies and applying them consistently across the system.
Requirements addressed: - 10.2: Configurable backpressure policies per agent class - 10.5: Queue depth monitoring and backpressure triggers
- create_policy(policy_name, threshold, agent_id='unknown')[ソース]¶
Create a backpressure policy instance.
- 戻り値の型:
- パラメータ:
- Args:
policy_name: Name of the policy (defer, shed_oldest, drop_newest) threshold: Queue depth threshold agent_id: Agent identifier for metrics
- Returns:
BackpressurePolicy instance
- Raises:
ValueError: If policy_name is not recognized
- class gunn.utils.backpressure.BackpressureQueue(policy, maxsize=0, queue_type='queue')[ソース]¶
ベースクラス:
Generic
Queue with integrated backpressure policy support.
A queue implementation that automatically applies backpressure policies when depth thresholds are exceeded.
Requirements addressed: - 10.2: Backpressure policies integrated with queue operations - 10.5: Automatic backpressure triggers based on queue depth
- パラメータ:
policy (BackpressurePolicy)
maxsize (int)
queue_type (str)
- async put(item)[ソース]¶
Put an item in the queue with backpressure handling.
- 戻り値の型:
- パラメータ:
item (T)
- Args:
item: Item to add to queue
- Raises:
BackpressureError: If policy defers the operation
gunn.utils.errors module¶
Structured error types for multi-agent simulation.
This module defines error types for various failure scenarios in the simulation system with recovery actions and structured information.
- class gunn.utils.errors.RecoveryAction(*values)[ソース]¶
ベースクラス:
Enum
Recovery actions for error handling.
- RETRY = 'retry'¶
- ABORT = 'abort'¶
- REGENERATE = 'regenerate'¶
- RETRY_WITH_DELAY = 'retry_with_delay'¶
- MODIFY_INTENT = 'modify_intent'¶
- DEFER = 'defer'¶
- SHED_OLDEST = 'shed_oldest'¶
- DROP_NEWEST = 'drop_newest'¶
- exception gunn.utils.errors.SimulationError(message, recovery_action=RecoveryAction.ABORT)[ソース]¶
ベースクラス:
Exception
Base exception for simulation errors.
- パラメータ:
message (str)
recovery_action (RecoveryAction)
- exception gunn.utils.errors.StaleContextError(req_id, expected_seq, actual_seq, threshold=0)[ソース]¶
ベースクラス:
SimulationError
Error raised when intent context becomes stale.
This error occurs when an agent's context (view_seq) is outdated compared to the current world state, indicating the intent was based on stale information.
Requirements addressed: - 4.2: Staleness detection using latest_view_seq > context_seq + threshold - 10.3: Structured error codes including STALE_CONTEXT
- exception gunn.utils.errors.IntentConflictError(intent, conflicting_effects)[ソース]¶
ベースクラス:
SimulationError
Error raised when intent conflicts with existing effects.
This error occurs when an intent cannot be executed due to conflicts with recently processed effects or other intents.
Requirements addressed: - 3.4: Intent conflict resolution - 10.3: Structured error codes including INTENT_CONFLICT
- exception gunn.utils.errors.QuotaExceededError(agent_id, quota_type, limit, current=0)[ソース]¶
ベースクラス:
SimulationError
Error raised when agent exceeds quota limits.
This error occurs when an agent attempts to perform an action that would exceed their configured quota limits.
Requirements addressed: - 1.5: Quota limits and permissions - 10.3: Structured error codes including QUOTA_EXCEEDED
- exception gunn.utils.errors.TimeoutError(operation, deadline_ms, actual_ms)[ソース]¶
ベースクラス:
SimulationError
Error raised when operations exceed deadline.
This error occurs when intent processing or generation takes longer than the configured deadline.
Requirements addressed: - 3.5: Deadline enforcement for intent processing - 10.3: Structured error codes including TIMEOUT
- exception gunn.utils.errors.BackpressureError(agent_id, queue_type, current_depth, threshold, policy='defer')[ソース]¶
ベースクラス:
SimulationError
Error raised when backpressure limits are exceeded.
This error occurs when queue depths or processing rates exceed configured thresholds, triggering backpressure policies.
Requirements addressed: - 10.2: Backpressure policies (defer, shed oldest, drop newest) - 10.5: Queue depth monitoring and backpressure triggers
- exception gunn.utils.errors.ValidationError(intent, validation_failures)[ソース]¶
ベースクラス:
SimulationError
Error raised when intent validation fails.
This error occurs when an intent fails validation checks before being converted to an effect.
Requirements addressed: - 1.5: Intent validation through EffectValidator - 3.4: Validation for quota limits, cooldowns, and permissions
- exception gunn.utils.errors.CircuitBreakerOpenError(component, failure_count, threshold)[ソース]¶
ベースクラス:
SimulationError
Error raised when circuit breaker is open.
This error occurs when a circuit breaker is in the open state, preventing operations from being executed to avoid cascading failures.
Requirements addressed: - 10.5: Circuit breaker patterns per agent or adapter
- class gunn.utils.errors.CircuitBreaker(failure_threshold=5, recovery_timeout=30.0, half_open_max_calls=3)[ソース]¶
ベースクラス:
object
Circuit breaker for fault tolerance with failure thresholds.
Implements the circuit breaker pattern to prevent cascading failures by temporarily blocking operations when failure rates exceed thresholds.
Requirements addressed: - 10.5: Circuit breaker patterns per agent or adapter
- class gunn.utils.errors.ErrorRecoveryPolicy(max_retries=3, retry_delay_ms=100.0, backoff_multiplier=2.0, max_delay_ms=5000.0)[ソース]¶
ベースクラス:
object
Policy for handling different types of errors with recovery strategies.
Provides configurable recovery strategies for different error types to enable graceful degradation and automatic recovery.
Requirements addressed: - 10.4: Partial failures remain consistent with event log - 10.5: Circuit breaker patterns and error recovery
- handle_stale_context(error)[ソース]¶
Handle stale context error.
- 戻り値の型:
- パラメータ:
error (StaleContextError)
- Args:
error: The stale context error
- Returns:
Recovery action (typically REGENERATE)
- handle_intent_conflict(error)[ソース]¶
Handle intent conflict error.
- 戻り値の型:
- パラメータ:
error (IntentConflictError)
- Args:
error: The intent conflict error
- Returns:
Recovery action (typically RETRY_WITH_DELAY)
- handle_quota_exceeded(error)[ソース]¶
Handle quota exceeded error.
- 戻り値の型:
- パラメータ:
error (QuotaExceededError)
- Args:
error: The quota exceeded error
- Returns:
Recovery action (typically DEFER)
- handle_timeout(error)[ソース]¶
Handle timeout error.
- 戻り値の型:
- パラメータ:
error (TimeoutError)
- Args:
error: The timeout error
- Returns:
Recovery action (typically ABORT)
- handle_backpressure(error)[ソース]¶
Handle backpressure error.
- 戻り値の型:
- パラメータ:
error (BackpressureError)
- Args:
error: The backpressure error
- Returns:
Recovery action based on backpressure policy
- handle_validation(error)[ソース]¶
Handle validation error.
- 戻り値の型:
- パラメータ:
error (ValidationError)
- Args:
error: The validation error
- Returns:
Recovery action (typically ABORT)
- handle_circuit_breaker(error)[ソース]¶
Handle circuit breaker open error.
- 戻り値の型:
- パラメータ:
error (CircuitBreakerOpenError)
- Args:
error: The circuit breaker error
- Returns:
Recovery action (typically RETRY_WITH_DELAY)
- calculate_retry_delay(attempt)[ソース]¶
Calculate retry delay with exponential backoff.
- Args:
attempt: Retry attempt number (0-based)
- Returns:
Delay in milliseconds
- should_retry(error, attempt)[ソース]¶
Determine if error should be retried.
- 戻り値の型:
- パラメータ:
error (SimulationError)
attempt (int)
- Args:
error: The error that occurred attempt: Current attempt number (0-based)
- Returns:
True if should retry, False otherwise
- exception gunn.utils.errors.LLMGenerationError(message, provider='unknown', model='unknown')[ソース]¶
ベースクラス:
SimulationError
Error raised when LLM generation fails.
This error occurs when LLM token generation encounters an unrecoverable error during streaming.
Requirements addressed: - 6.1: Proper error handling for generation failures - 10.3: Structured error codes for LLM failures
- exception gunn.utils.errors.LLMTimeoutError(message, timeout_seconds, provider='unknown')[ソース]¶
ベースクラス:
SimulationError
Error raised when LLM generation times out.
This error occurs when LLM generation exceeds the configured timeout duration.
Requirements addressed: - 6.3: Timeout handling for LLM generation - 10.3: Structured error codes for timeout scenarios
gunn.utils.hashing module¶
Hash chain utilities for log integrity.
This module provides utilities for maintaining integrity of event logs through hash chaining and canonical JSON serialization.
- gunn.utils.hashing.canonical_json(obj)[ソース]¶
Generate canonical JSON serialization for consistent hashing.
Uses orjson with sorted keys to ensure deterministic output regardless of dictionary key ordering.
- Args:
obj: Dictionary to serialize
- Returns:
Canonical JSON bytes representation
- Example:
>>> data = {"b": 2, "a": 1} >>> canonical_json(data) b'{"a":1,"b":2}'
- gunn.utils.hashing.chain_checksum(effect, prev_checksum=None)[ソース]¶
Generate hash chain checksum using SHA-256.
Creates a hash chain by combining the previous checksum with the canonical JSON representation of the current effect.
- Args:
effect: Effect dictionary to hash prev_checksum: Previous checksum in the chain (None for first entry)
- Returns:
Hexadecimal SHA-256 hash string
- Example:
>>> effect = {"kind": "test", "payload": {}} >>> chain_checksum(effect) 'a1b2c3...' >>> chain_checksum(effect, "prev_hash") 'd4e5f6...'
- gunn.utils.hashing.validate_hash_chain(entries)[ソース]¶
Validate integrity of a hash chain sequence.
Verifies that each entry's checksum correctly follows from the previous entry's checksum and the entry's effect data.
- Args:
entries: List of log entries, each containing 'effect' and 'checksum' fields
- Returns:
True if chain is valid, False if corruption detected
- Example:
>>> entries = [ ... {"effect": {"kind": "test1"}, "checksum": "abc123"}, ... {"effect": {"kind": "test2"}, "checksum": "def456"} ... ] >>> validate_hash_chain(entries) True
- gunn.utils.hashing.detect_corruption(entries)[ソース]¶
Detect corrupted entries in a hash chain.
Returns indices of entries that have invalid checksums, indicating potential corruption or tampering.
- Args:
entries: List of log entries with 'effect' and 'checksum' fields
- Returns:
List of indices where corruption was detected
- Example:
>>> entries = [ ... {"effect": {"kind": "test1"}, "checksum": "valid_hash"}, ... {"effect": {"kind": "test2"}, "checksum": "invalid_hash"} ... ] >>> detect_corruption(entries) [1]
- gunn.utils.hashing.verify_sequence_integrity(entries)[ソース]¶
Comprehensive integrity verification of a log sequence.
Performs multiple integrity checks including hash chain validation, sequence gap detection, and corruption analysis.
- Args:
entries: List of log entries with required fields
- Returns:
Dictionary containing integrity report with: - valid: Overall validity boolean - corrupted_entries: List of corrupted entry indices - missing_sequences: List of detected sequence gaps - total_entries: Total number of entries checked
- Example:
>>> entries = [{"effect": {...}, "checksum": "...", "global_seq": 1}] >>> verify_sequence_integrity(entries) {"valid": True, "corrupted_entries": [], "missing_sequences": [], "total_entries": 1}
gunn.utils.hashing_demo module¶
gunn.utils.memory module¶
Memory management and compaction utilities for the simulation core.
This module provides memory management capabilities including WorldState snapshots, log compaction, view cache eviction with LRU policy, and memory usage tracking.
- class gunn.utils.memory.MemoryStats(**data)[ソース]¶
ベースクラス:
BaseModel
Memory usage statistics.
- パラメータ:
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gunn.utils.memory.MemoryConfig(**data)[ソース]¶
ベースクラス:
BaseModel
Configuration for memory management.
- パラメータ:
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gunn.utils.memory.WorldStateSnapshot(**data)[ソース]¶
ベースクラス:
BaseModel
Snapshot of world state at a specific point in time.
- パラメータ:
global_seq (int)
sim_time (float)
wall_time (float)
world_state (WorldState)
checksum (str)
-
world_state:
WorldState
¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gunn.utils.memory.ViewCache(max_size=1000)[ソース]¶
ベースクラス:
object
LRU cache for agent views with configurable size limits.
- パラメータ:
max_size (int)
- get(key)[ソース]¶
Get cached view and mark as recently used.
- Args:
key: Cache key (typically agent_id:view_seq)
- Returns:
Cached view or None if not found
- put(key, value)[ソース]¶
Store view in cache with LRU eviction.
- Args:
key: Cache key value: View to cache
- class gunn.utils.memory.SnapshotManager(config)[ソース]¶
ベースクラス:
object
Manages WorldState snapshots for faster replay.
- パラメータ:
config (MemoryConfig)
- async create_snapshot(global_seq, world_state, sim_time=0.0)[ソース]¶
Create a new WorldState snapshot.
- 戻り値の型:
- パラメータ:
global_seq (int)
world_state (WorldState)
sim_time (float)
- Args:
global_seq: Global sequence number for this snapshot world_state: Current world state to snapshot sim_time: Simulation time
- Returns:
Created snapshot
- find_nearest_snapshot(target_seq)[ソース]¶
Find the snapshot closest to but not exceeding target sequence.
- 戻り値の型:
- パラメータ:
target_seq (int)
- Args:
target_seq: Target global sequence number
- Returns:
Nearest snapshot or None if no suitable snapshot exists
- get_all_snapshots()[ソース]¶
Get all snapshots ordered by global_seq.
- 戻り値の型:
- Returns:
List of all snapshots
- async validate_snapshot_integrity(snapshot)[ソース]¶
Validate snapshot integrity using checksum.
- 戻り値の型:
- パラメータ:
snapshot (WorldStateSnapshot)
- Args:
snapshot: Snapshot to validate
- Returns:
True if snapshot is valid
- class gunn.utils.memory.MemoryManager(config)[ソース]¶
ベースクラス:
object
Central memory management for the simulation core.
Handles WorldState snapshots, log compaction, view cache eviction, and memory usage monitoring with configurable limits.
Requirements addressed: - 7.3: WorldState snapshot creation every N events for faster replay - 11.4: Memory limits and compaction correctness
- パラメータ:
config (MemoryConfig)
- async check_and_create_snapshot(current_seq, world_state, sim_time=0.0)[ソース]¶
Check if a snapshot should be created and create it if needed.
- 戻り値の型:
- パラメータ:
current_seq (int)
world_state (WorldState)
sim_time (float)
- Args:
current_seq: Current global sequence number world_state: Current world state sim_time: Current simulation time
- Returns:
Created snapshot or None if no snapshot was needed
- async compact_log(event_log)[ソース]¶
Compact old log entries while preserving replay capability.
Removes old entries from the log while ensuring that replay capability is preserved through snapshots. Only compacts if there are sufficient snapshots to maintain replay integrity.
- Args:
event_log: Event log to compact
- Returns:
Number of entries removed
- evict_old_views(max_age_seconds=3600.0)[ソース]¶
Remove old cached views to free memory.
- Args:
max_age_seconds: Maximum age for cached views
- Returns:
Number of views evicted
- async check_memory_limits(event_log)[ソース]¶
Check if memory limits are exceeded and trigger compaction if needed.
- Args:
event_log: Event log to check
- Returns:
True if memory is within limits
- estimate_memory_usage(event_log)[ソース]¶
Return current memory usage statistics.
- 戻り値の型:
- パラメータ:
event_log (EventLog)
- Args:
event_log: Event log to analyze
- Returns:
Memory usage statistics
gunn.utils.metrics_exporter module¶
Metrics exporter with feature flag status.
This module provides functionality to export feature flag status and other configuration metrics for operational visibility.
- class gunn.utils.metrics_exporter.MetricsExporter(config)[ソース]¶
ベースクラス:
object
Exports configuration and feature flag metrics.
- パラメータ:
config (Config)
- export_feature_flags(features)[ソース]¶
Export feature flag status as metrics.
- 戻り値の型:
- パラメータ:
features (FeatureFlags)
- Args:
features: Feature flags to export
- export_config_info(config)[ソース]¶
Export configuration information as metrics.
- Args:
config: Configuration to export
- record_startup_time(startup_duration)[ソース]¶
Record service startup time.
- Args:
startup_duration: Time taken to start the service in seconds
- record_config_validation_time(validation_duration)[ソース]¶
Record configuration validation time.
- Args:
validation_duration: Time taken to validate config in seconds
gunn.utils.scheduling module¶
Scheduling utilities for fair intent processing.
This module provides scheduling algorithms for fair processing of intents across multiple agents, including weighted round robin for fairness.
- class gunn.utils.scheduling.IntentScheduler(*args, **kwargs)[ソース]¶
ベースクラス:
Protocol
Protocol for intent scheduling algorithms.
- enqueue(intent, priority=0)[ソース]¶
Enqueue an intent for processing.
- Args:
intent: Intent to enqueue priority: Priority level (higher = more important)
- dequeue()[ソース]¶
Dequeue the next intent for processing.
- Returns:
Next intent to process, or None if queue is empty
- class gunn.utils.scheduling.WeightedRoundRobinScheduler(default_weight=1, max_queue_depth=100, priority_levels=3)[ソース]¶
ベースクラス:
object
Weighted Round Robin scheduler for fair intent processing.
Provides fair scheduling across agents with configurable weights and priority levels. Ensures no agent can monopolize processing while respecting priority and fairness constraints.
Requirements addressed: - 3.4: Priority and fairness policies - 9: Fairness (Weighted Round Robin) in intent validation pipeline
- set_agent_weight(agent_id, weight)[ソース]¶
Set weight for specific agent.
- Args:
agent_id: Agent identifier weight: Weight for this agent (higher = more processing time)
- Raises:
ValueError: If weight is not positive
- enqueue(intent, priority=0)[ソース]¶
Enqueue an intent for processing.
- Args:
intent: Intent to enqueue priority: Priority level (0 = highest priority)
- Raises:
ValueError: If priority is invalid or queue is full
- dequeue()[ソース]¶
Dequeue the next intent using weighted round robin.
- Returns:
Next intent to process, or None if all queues are empty
- get_queue_depth(agent_id=None)[ソース]¶
Get queue depth for specific agent or total.
- Args:
agent_id: Agent ID to check, or None for total depth
- Returns:
Queue depth
- get_agent_queue_depths()[ソース]¶
Get queue depths for all agents.
- Returns:
Dictionary mapping agent_id to queue depth
- get_priority_distribution(agent_id)[ソース]¶
Get distribution of intents by priority for an agent.
- Args:
agent_id: Agent identifier
- Returns:
Dictionary mapping priority to count
- get_stats()[ソース]¶
Get scheduler statistics.
- Returns:
Dictionary with comprehensive scheduler statistics
- class gunn.utils.scheduling.PriorityScheduler(priority_levels=3, max_queue_depth=1000)[ソース]¶
ベースクラス:
object
Simple priority-based scheduler without fairness.
Processes intents strictly by priority level, which can lead to starvation of lower-priority intents but ensures highest priority intents are always processed first.
- enqueue(intent, priority=0)[ソース]¶
Enqueue intent by priority.
- Args:
intent: Intent to enqueue priority: Priority level (0 = highest)
- Raises:
ValueError: If priority is invalid or queue is full
- dequeue()[ソース]¶
Dequeue highest priority intent.
- Returns:
Next intent to process, or None if all queues are empty
gunn.utils.telemetry module¶
Telemetry utilities for logging, metrics, and tracing.
This module provides centralized observability infrastructure including: - Structured logging with PII redaction - Prometheus metrics collection - OpenTelemetry tracing setup - Performance measurement utilities - Memory usage tracking and reporting - Bandwidth/CPU measurement for patch operations
- gunn.utils.telemetry.redact_pii(text)[ソース]¶
Redact personally identifiable information from text.
- Args:
text: Input text that may contain PII
- Returns:
Text with PII patterns replaced with [REDACTED_<type>], or original input if not a string
- Example:
>>> redact_pii("Contact john@example.com or call 555-123-4567") 'Contact [REDACTED_EMAIL] or call [REDACTED_PHONE]'
- gunn.utils.telemetry.pii_redaction_processor(logger, method_name, event_dict)[ソース]¶
Structlog processor to redact PII from log events.
- Args:
logger: Logger instance (unused) method_name: Log method name (unused) event_dict: Event dictionary to process
- Returns:
Event dictionary with PII redacted from string values
- gunn.utils.telemetry.setup_logging(log_level='INFO', enable_pii_redaction=True)[ソース]¶
Initialize structured logging with PII redaction.
- Args:
log_level: Logging level (DEBUG, INFO, WARNING, ERROR) enable_pii_redaction: Whether to enable PII redaction processor
- gunn.utils.telemetry.setup_tracing(service_name='gunn', otlp_endpoint=None, enable_fastapi_instrumentation=True)[ソース]¶
Initialize OpenTelemetry tracing.
- 戻り値の型:
- パラメータ:
- Args:
service_name: Name of the service for tracing otlp_endpoint: OTLP endpoint URL (if None, uses console exporter) enable_fastapi_instrumentation: Whether to enable FastAPI auto-instrumentation
- gunn.utils.telemetry.get_tracer(name)[ソース]¶
Get OpenTelemetry tracer for a component.
- 戻り値の型:
Tracer
- パラメータ:
name (str)
- Args:
name: Tracer name (typically module name)
- Returns:
Tracer instance
- gunn.utils.telemetry.get_logger(name, **context)[ソース]¶
Get a structured logger with optional context.
- Args:
name: Logger name **context: Additional context to bind to logger
- Returns:
Bound logger with context
- gunn.utils.telemetry.log_operation(logger, operation, status='success', global_seq=None, view_seq=None, agent_id=None, req_id=None, latency_ms=None, **extra_context)[ソース]¶
Log an operation with standardized fields for observability.
- 戻り値の型:
- パラメータ:
- Args:
logger: Structured logger instance operation: Operation name status: Operation status (success, error, warning) global_seq: Global sequence number view_seq: View sequence number agent_id: Agent identifier req_id: Request identifier latency_ms: Operation latency in milliseconds **extra_context: Additional context fields
- class gunn.utils.telemetry.PerformanceTimer(operation, agent_id=None, req_id=None, global_seq=None, view_seq=None, logger=None, record_metrics=True, create_span=True, tracer_name='gunn.performance')[ソース]¶
ベースクラス:
object
Context manager for measuring operation performance.
Automatically records metrics, logs timing information, and creates tracing spans.
- パラメータ:
- gunn.utils.telemetry.async_performance_timer(operation, agent_id=None, req_id=None, global_seq=None, view_seq=None, logger=None, record_metrics=True, create_span=True, tracer_name='gunn.performance')[ソース]¶
Async context manager for measuring operation performance.
- 戻り値の型:
- パラメータ:
- Args:
operation: Operation name for metrics/logging agent_id: Agent identifier (optional) req_id: Request identifier (optional) global_seq: Global sequence number (optional) view_seq: View sequence number (optional) logger: Logger instance (optional) record_metrics: Whether to record Prometheus metrics create_span: Whether to create tracing span tracer_name: Tracer name for spans
- Yields:
PerformanceTimer instance
- gunn.utils.telemetry.record_queue_depth(agent_id, depth)[ソース]¶
Record queue depth metric for an agent.
- Args:
agent_id: Agent identifier depth: Current queue depth
- gunn.utils.telemetry.record_cancellation(agent_id, reason)[ソース]¶
Record a cancellation event.
- Args:
agent_id: Agent identifier reason: Cancellation reason
- gunn.utils.telemetry.record_queue_high_watermark(agent_id, queue_type, depth)[ソース]¶
Record queue depth high watermark for backpressure monitoring.
- Args:
agent_id: Agent identifier queue_type: Type of queue (agent_queue, system_queue, etc.) depth: Queue depth that triggered high watermark
- gunn.utils.telemetry.record_backpressure_event(agent_id, queue_type, policy)[ソース]¶
Record a backpressure event.
- Args:
agent_id: Agent identifier queue_type: Type of queue that triggered backpressure policy: Backpressure policy applied (defer, shed_oldest, drop_newest)
- gunn.utils.telemetry.record_circuit_breaker_state_change(component, from_state, to_state)[ソース]¶
Record circuit breaker state change.
- Args:
component: Component name with circuit breaker from_state: Previous state (CLOSED, OPEN, HALF_OPEN) to_state: New state (CLOSED, OPEN, HALF_OPEN)
- gunn.utils.telemetry.record_error_recovery_action(error_type, recovery_action, agent_id='unknown')[ソース]¶
Record an error recovery action.
- Args:
error_type: Type of error that occurred recovery_action: Recovery action taken agent_id: Agent identifier (optional)
- gunn.utils.telemetry.record_intent_throughput(agent_id, intent_kind, status='success')[ソース]¶
Record intent processing throughput.
- Args:
agent_id: Agent identifier intent_kind: Type of intent (Speak, Move, etc.) status: Processing status (success, error, conflict)
- gunn.utils.telemetry.record_conflict(agent_id, conflict_type)[ソース]¶
Record an intent conflict.
- Args:
agent_id: Agent identifier conflict_type: Type of conflict (staleness, validation, quota)
- gunn.utils.telemetry.record_observation_delivery_latency(agent_id, latency_seconds)[ソース]¶
Record observation delivery latency.
- Args:
agent_id: Agent identifier latency_seconds: Time from effect creation to observation delivery
- gunn.utils.telemetry.update_global_seq(seq)[ソース]¶
Update the current global sequence number metric.
- Args:
seq: Current global sequence number
- gunn.utils.telemetry.update_view_seq(agent_id, seq)[ソース]¶
Update the current view sequence number for an agent.
- Args:
agent_id: Agent identifier seq: Current view sequence number
- gunn.utils.telemetry.update_active_agents_count(count)[ソース]¶
Update the number of active agents.
- Args:
count: Number of currently active agents
- gunn.utils.telemetry.start_metrics_server(port=8000)[ソース]¶
Start Prometheus metrics HTTP server.
- Args:
port: Port to serve metrics on
- class gunn.utils.telemetry.MonotonicClock[ソース]¶
ベースクラス:
object
Monotonic clock for internal timing measurements.
Uses asyncio event loop's monotonic time for consistent timing that's not affected by system clock adjustments.
- gunn.utils.telemetry.get_timing_context()[ソース]¶
Get current timing context for logging.
- Returns:
Dictionary with monotonic_time and wall_time
- class gunn.utils.telemetry.SystemMonitor[ソース]¶
ベースクラス:
object
Monitor system resources and record metrics.
- record_memory_usage(component='system')[ソース]¶
Record current memory usage and return bytes used.
- Args:
component: Component name for metrics labeling
- Returns:
Memory usage in bytes
- class gunn.utils.telemetry.BandwidthMonitor[ソース]¶
ベースクラス:
object
Monitor bandwidth usage for patch operations and other data transfers.
- record_patch_bandwidth(agent_id, patch_size_bytes, operation_count, is_fallback=False)[ソース]¶
Record bandwidth usage for patch operations.
- Args:
agent_id: Agent identifier patch_size_bytes: Size of patch data in bytes operation_count: Number of patch operations is_fallback: Whether this was a fallback to full snapshot
gunn.utils.timing module¶
TimedQueue implementation for latency simulation.
This module provides a priority queue for timed delivery with latency simulation, supporting concurrent operations and precise timing control.
- class gunn.utils.timing.TimedQueue[ソース]¶
ベースクラス:
object
Priority queue for timed delivery with latency simulation.
Supports scheduling items for delivery at specific times with proper locking for put operations and lock-free sleep for get operations to avoid blocking.
Requirements addressed: - 6.4: ObservationDelta delivery latency (core in-proc) ≤ 20ms - 6.5: Timed delivery using per-agent TimedQueues with latency models - 4.7: Non-blocking operations per agent
- async put_at(deliver_at, item)[ソース]¶
Schedule item for delivery at specific time.
- Args:
- deliver_at: Absolute time (from asyncio.get_running_loop().time()) when item
should be delivered
item: Item to deliver
- Raises:
RuntimeError: If queue is closed
- async put_in(delay_seconds, item)[ソース]¶
Schedule item for delivery after specified delay.
- Args:
delay_seconds: Delay in seconds from now item: Item to deliver
- async get()[ソース]¶
Get next item when its delivery time arrives.
Uses lock-free sleep to avoid blocking other operations.
- 戻り値の型:
- Returns:
The next item when its delivery time arrives
- Raises:
RuntimeError: If queue is closed and empty
- get_nowait()[ソース]¶
Get next item if it's ready for delivery now.
- 戻り値の型:
- Returns:
The next item if ready, otherwise raises asyncio.QueueEmpty
- Raises:
asyncio.QueueEmpty: If no items are ready for delivery RuntimeError: If queue is closed and empty
gunn.utils.timing_demo module¶
Module contents¶
- exception gunn.utils.BackpressureError(agent_id, queue_type, current_depth, threshold, policy='defer')[ソース]¶
ベースクラス:
SimulationError
Error raised when backpressure limits are exceeded.
This error occurs when queue depths or processing rates exceed configured thresholds, triggering backpressure policies.
Requirements addressed: - 10.2: Backpressure policies (defer, shed oldest, drop newest) - 10.5: Queue depth monitoring and backpressure triggers
- class gunn.utils.BackpressureManager[ソース]¶
ベースクラス:
object
Manager for applying backpressure policies to queues.
Provides a unified interface for managing different backpressure policies and applying them consistently across the system.
Requirements addressed: - 10.2: Configurable backpressure policies per agent class - 10.5: Queue depth monitoring and backpressure triggers
- create_policy(policy_name, threshold, agent_id='unknown')[ソース]¶
Create a backpressure policy instance.
- 戻り値の型:
- パラメータ:
- Args:
policy_name: Name of the policy (defer, shed_oldest, drop_newest) threshold: Queue depth threshold agent_id: Agent identifier for metrics
- Returns:
BackpressurePolicy instance
- Raises:
ValueError: If policy_name is not recognized
- class gunn.utils.BackpressureQueue(policy, maxsize=0, queue_type='queue')[ソース]¶
ベースクラス:
Generic
Queue with integrated backpressure policy support.
A queue implementation that automatically applies backpressure policies when depth thresholds are exceeded.
Requirements addressed: - 10.2: Backpressure policies integrated with queue operations - 10.5: Automatic backpressure triggers based on queue depth
- パラメータ:
policy (BackpressurePolicy)
maxsize (int)
queue_type (str)
- async get()[ソース]¶
Get an item from the queue.
- 戻り値の型:
TypeVar
(T
)
- Returns:
Next item from queue
- Raises:
asyncio.QueueEmpty: If queue is empty
- class gunn.utils.CircuitBreaker(failure_threshold=5, recovery_timeout=30.0, half_open_max_calls=3)[ソース]¶
ベースクラス:
object
Circuit breaker for fault tolerance with failure thresholds.
Implements the circuit breaker pattern to prevent cascading failures by temporarily blocking operations when failure rates exceed thresholds.
Requirements addressed: - 10.5: Circuit breaker patterns per agent or adapter
- async async_call(func, *args, **kwargs)[ソース]¶
Execute async function with circuit breaker protection.
- exception gunn.utils.CircuitBreakerOpenError(component, failure_count, threshold)[ソース]¶
ベースクラス:
SimulationError
Error raised when circuit breaker is open.
This error occurs when a circuit breaker is in the open state, preventing operations from being executed to avoid cascading failures.
Requirements addressed: - 10.5: Circuit breaker patterns per agent or adapter
- class gunn.utils.DeferPolicy(threshold, agent_id='unknown')[ソース]¶
ベースクラス:
BackpressurePolicy
,Generic
Defer policy - block new items when threshold exceeded.
This is the default policy that raises BackpressureError to defer processing until queue depth decreases below threshold.
Requirements addressed: - 10.2: Backpressure policies with defer as default
- class gunn.utils.DropNewestPolicy(threshold, agent_id='unknown')[ソース]¶
ベースクラス:
BackpressurePolicy
,Generic
Drop newest policy - drop new items when threshold exceeded.
When queue exceeds threshold, drops the new item instead of adding it, preserving existing queued items.
Requirements addressed: - 10.2: Backpressure policies with drop newest option
- class gunn.utils.ErrorRecoveryPolicy(max_retries=3, retry_delay_ms=100.0, backoff_multiplier=2.0, max_delay_ms=5000.0)[ソース]¶
ベースクラス:
object
Policy for handling different types of errors with recovery strategies.
Provides configurable recovery strategies for different error types to enable graceful degradation and automatic recovery.
Requirements addressed: - 10.4: Partial failures remain consistent with event log - 10.5: Circuit breaker patterns and error recovery
- calculate_retry_delay(attempt)[ソース]¶
Calculate retry delay with exponential backoff.
- Args:
attempt: Retry attempt number (0-based)
- Returns:
Delay in milliseconds
- handle_backpressure(error)[ソース]¶
Handle backpressure error.
- 戻り値の型:
- パラメータ:
error (BackpressureError)
- Args:
error: The backpressure error
- Returns:
Recovery action based on backpressure policy
- handle_circuit_breaker(error)[ソース]¶
Handle circuit breaker open error.
- 戻り値の型:
- パラメータ:
error (CircuitBreakerOpenError)
- Args:
error: The circuit breaker error
- Returns:
Recovery action (typically RETRY_WITH_DELAY)
- handle_intent_conflict(error)[ソース]¶
Handle intent conflict error.
- 戻り値の型:
- パラメータ:
error (IntentConflictError)
- Args:
error: The intent conflict error
- Returns:
Recovery action (typically RETRY_WITH_DELAY)
- handle_quota_exceeded(error)[ソース]¶
Handle quota exceeded error.
- 戻り値の型:
- パラメータ:
error (QuotaExceededError)
- Args:
error: The quota exceeded error
- Returns:
Recovery action (typically DEFER)
- handle_stale_context(error)[ソース]¶
Handle stale context error.
- 戻り値の型:
- パラメータ:
error (StaleContextError)
- Args:
error: The stale context error
- Returns:
Recovery action (typically REGENERATE)
- handle_timeout(error)[ソース]¶
Handle timeout error.
- 戻り値の型:
- パラメータ:
error (TimeoutError)
- Args:
error: The timeout error
- Returns:
Recovery action (typically ABORT)
- handle_validation(error)[ソース]¶
Handle validation error.
- 戻り値の型:
- パラメータ:
error (ValidationError)
- Args:
error: The validation error
- Returns:
Recovery action (typically ABORT)
- should_retry(error, attempt)[ソース]¶
Determine if error should be retried.
- 戻り値の型:
- パラメータ:
error (SimulationError)
attempt (int)
- Args:
error: The error that occurred attempt: Current attempt number (0-based)
- Returns:
True if should retry, False otherwise
- exception gunn.utils.IntentConflictError(intent, conflicting_effects)[ソース]¶
ベースクラス:
SimulationError
Error raised when intent conflicts with existing effects.
This error occurs when an intent cannot be executed due to conflicts with recently processed effects or other intents.
Requirements addressed: - 3.4: Intent conflict resolution - 10.3: Structured error codes including INTENT_CONFLICT
- class gunn.utils.MemoryConfig(**data)[ソース]¶
ベースクラス:
BaseModel
Configuration for memory management.
- パラメータ:
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gunn.utils.MemoryManager(config)[ソース]¶
ベースクラス:
object
Central memory management for the simulation core.
Handles WorldState snapshots, log compaction, view cache eviction, and memory usage monitoring with configurable limits.
Requirements addressed: - 7.3: WorldState snapshot creation every N events for faster replay - 11.4: Memory limits and compaction correctness
- パラメータ:
config (MemoryConfig)
- async check_and_create_snapshot(current_seq, world_state, sim_time=0.0)[ソース]¶
Check if a snapshot should be created and create it if needed.
- 戻り値の型:
- パラメータ:
current_seq (int)
world_state (WorldState)
sim_time (float)
- Args:
current_seq: Current global sequence number world_state: Current world state sim_time: Current simulation time
- Returns:
Created snapshot or None if no snapshot was needed
- async check_memory_limits(event_log)[ソース]¶
Check if memory limits are exceeded and trigger compaction if needed.
- Args:
event_log: Event log to check
- Returns:
True if memory is within limits
- async compact_log(event_log)[ソース]¶
Compact old log entries while preserving replay capability.
Removes old entries from the log while ensuring that replay capability is preserved through snapshots. Only compacts if there are sufficient snapshots to maintain replay integrity.
- Args:
event_log: Event log to compact
- Returns:
Number of entries removed
- estimate_memory_usage(event_log)[ソース]¶
Return current memory usage statistics.
- 戻り値の型:
- パラメータ:
event_log (EventLog)
- Args:
event_log: Event log to analyze
- Returns:
Memory usage statistics
- class gunn.utils.MemoryStats(**data)[ソース]¶
ベースクラス:
BaseModel
Memory usage statistics.
- パラメータ:
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class gunn.utils.PriorityScheduler(priority_levels=3, max_queue_depth=1000)[ソース]¶
ベースクラス:
object
Simple priority-based scheduler without fairness.
Processes intents strictly by priority level, which can lead to starvation of lower-priority intents but ensures highest priority intents are always processed first.
- dequeue()[ソース]¶
Dequeue highest priority intent.
- Returns:
Next intent to process, or None if all queues are empty
- enqueue(intent, priority=0)[ソース]¶
Enqueue intent by priority.
- Args:
intent: Intent to enqueue priority: Priority level (0 = highest)
- Raises:
ValueError: If priority is invalid or queue is full
- exception gunn.utils.QuotaExceededError(agent_id, quota_type, limit, current=0)[ソース]¶
ベースクラス:
SimulationError
Error raised when agent exceeds quota limits.
This error occurs when an agent attempts to perform an action that would exceed their configured quota limits.
Requirements addressed: - 1.5: Quota limits and permissions - 10.3: Structured error codes including QUOTA_EXCEEDED
- class gunn.utils.RecoveryAction(*values)[ソース]¶
ベースクラス:
Enum
Recovery actions for error handling.
- RETRY = 'retry'¶
- ABORT = 'abort'¶
- REGENERATE = 'regenerate'¶
- RETRY_WITH_DELAY = 'retry_with_delay'¶
- MODIFY_INTENT = 'modify_intent'¶
- DEFER = 'defer'¶
- SHED_OLDEST = 'shed_oldest'¶
- DROP_NEWEST = 'drop_newest'¶
- class gunn.utils.ShedOldestPolicy(threshold, agent_id='unknown')[ソース]¶
ベースクラス:
BackpressurePolicy
,Generic
Shed oldest policy - drop oldest items to make room for new ones.
When queue exceeds threshold, removes oldest items to make room for new items, maintaining queue size at threshold.
Requirements addressed: - 10.2: Backpressure policies with shed oldest option
- exception gunn.utils.SimulationError(message, recovery_action=RecoveryAction.ABORT)[ソース]¶
ベースクラス:
Exception
Base exception for simulation errors.
- パラメータ:
message (str)
recovery_action (RecoveryAction)
- class gunn.utils.SnapshotManager(config)[ソース]¶
ベースクラス:
object
Manages WorldState snapshots for faster replay.
- パラメータ:
config (MemoryConfig)
- async create_snapshot(global_seq, world_state, sim_time=0.0)[ソース]¶
Create a new WorldState snapshot.
- 戻り値の型:
- パラメータ:
global_seq (int)
world_state (WorldState)
sim_time (float)
- Args:
global_seq: Global sequence number for this snapshot world_state: Current world state to snapshot sim_time: Simulation time
- Returns:
Created snapshot
- find_nearest_snapshot(target_seq)[ソース]¶
Find the snapshot closest to but not exceeding target sequence.
- 戻り値の型:
- パラメータ:
target_seq (int)
- Args:
target_seq: Target global sequence number
- Returns:
Nearest snapshot or None if no suitable snapshot exists
- get_all_snapshots()[ソース]¶
Get all snapshots ordered by global_seq.
- 戻り値の型:
- Returns:
List of all snapshots
- async validate_snapshot_integrity(snapshot)[ソース]¶
Validate snapshot integrity using checksum.
- 戻り値の型:
- パラメータ:
snapshot (WorldStateSnapshot)
- Args:
snapshot: Snapshot to validate
- Returns:
True if snapshot is valid
- exception gunn.utils.StaleContextError(req_id, expected_seq, actual_seq, threshold=0)[ソース]¶
ベースクラス:
SimulationError
Error raised when intent context becomes stale.
This error occurs when an agent's context (view_seq) is outdated compared to the current world state, indicating the intent was based on stale information.
Requirements addressed: - 4.2: Staleness detection using latest_view_seq > context_seq + threshold - 10.3: Structured error codes including STALE_CONTEXT
- class gunn.utils.TimedQueue[ソース]¶
ベースクラス:
object
Priority queue for timed delivery with latency simulation.
Supports scheduling items for delivery at specific times with proper locking for put operations and lock-free sleep for get operations to avoid blocking.
Requirements addressed: - 6.4: ObservationDelta delivery latency (core in-proc) ≤ 20ms - 6.5: Timed delivery using per-agent TimedQueues with latency models - 4.7: Non-blocking operations per agent
- async close()[ソース]¶
Close the queue, preventing new items from being added.
Existing get() operations will continue until the queue is empty.
- 戻り値の型:
- async get()[ソース]¶
Get next item when its delivery time arrives.
Uses lock-free sleep to avoid blocking other operations.
- 戻り値の型:
- Returns:
The next item when its delivery time arrives
- Raises:
RuntimeError: If queue is closed and empty
- get_nowait()[ソース]¶
Get next item if it's ready for delivery now.
- 戻り値の型:
- Returns:
The next item if ready, otherwise raises asyncio.QueueEmpty
- Raises:
asyncio.QueueEmpty: If no items are ready for delivery RuntimeError: If queue is closed and empty
- peek_next_delivery_time()[ソース]¶
Get the delivery time of the next item without removing it.
- Returns:
Delivery time of next item, or None if queue is empty
- async put_at(deliver_at, item)[ソース]¶
Schedule item for delivery at specific time.
- Args:
- deliver_at: Absolute time (from asyncio.get_running_loop().time()) when item
should be delivered
item: Item to deliver
- Raises:
RuntimeError: If queue is closed
- exception gunn.utils.TimeoutError(operation, deadline_ms, actual_ms)[ソース]¶
ベースクラス:
SimulationError
Error raised when operations exceed deadline.
This error occurs when intent processing or generation takes longer than the configured deadline.
Requirements addressed: - 3.5: Deadline enforcement for intent processing - 10.3: Structured error codes including TIMEOUT
- exception gunn.utils.ValidationError(intent, validation_failures)[ソース]¶
ベースクラス:
SimulationError
Error raised when intent validation fails.
This error occurs when an intent fails validation checks before being converted to an effect.
Requirements addressed: - 1.5: Intent validation through EffectValidator - 3.4: Validation for quota limits, cooldowns, and permissions
- class gunn.utils.ViewCache(max_size=1000)[ソース]¶
ベースクラス:
object
LRU cache for agent views with configurable size limits.
- パラメータ:
max_size (int)
- evict_old_entries(max_age_seconds=3600.0)[ソース]¶
Evict entries older than specified age.
- Args:
max_age_seconds: Maximum age for cached entries
- Returns:
Number of entries evicted
- get(key)[ソース]¶
Get cached view and mark as recently used.
- Args:
key: Cache key (typically agent_id:view_seq)
- Returns:
Cached view or None if not found
- class gunn.utils.WeightedRoundRobinScheduler(default_weight=1, max_queue_depth=100, priority_levels=3)[ソース]¶
ベースクラス:
object
Weighted Round Robin scheduler for fair intent processing.
Provides fair scheduling across agents with configurable weights and priority levels. Ensures no agent can monopolize processing while respecting priority and fairness constraints.
Requirements addressed: - 3.4: Priority and fairness policies - 9: Fairness (Weighted Round Robin) in intent validation pipeline
- clear()[ソース]¶
Clear all queues and return number of dropped intents.
- 戻り値の型:
- Returns:
Total number of intents that were dropped
- dequeue()[ソース]¶
Dequeue the next intent using weighted round robin.
- Returns:
Next intent to process, or None if all queues are empty
- enqueue(intent, priority=0)[ソース]¶
Enqueue an intent for processing.
- Args:
intent: Intent to enqueue priority: Priority level (0 = highest priority)
- Raises:
ValueError: If priority is invalid or queue is full
- get_agent_queue_depths()[ソース]¶
Get queue depths for all agents.
- Returns:
Dictionary mapping agent_id to queue depth
- get_priority_distribution(agent_id)[ソース]¶
Get distribution of intents by priority for an agent.
- Args:
agent_id: Agent identifier
- Returns:
Dictionary mapping priority to count
- get_queue_depth(agent_id=None)[ソース]¶
Get queue depth for specific agent or total.
- Args:
agent_id: Agent ID to check, or None for total depth
- Returns:
Queue depth
- get_stats()[ソース]¶
Get scheduler statistics.
- Returns:
Dictionary with comprehensive scheduler statistics
- class gunn.utils.WorldStateSnapshot(**data)[ソース]¶
ベースクラス:
BaseModel
Snapshot of world state at a specific point in time.
- パラメータ:
global_seq (int)
sim_time (float)
wall_time (float)
world_state (WorldState)
checksum (str)
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
world_state:
WorldState
¶
- gunn.utils.canonical_json(obj)[ソース]¶
Generate canonical JSON serialization for consistent hashing.
Uses orjson with sorted keys to ensure deterministic output regardless of dictionary key ordering.
- Args:
obj: Dictionary to serialize
- Returns:
Canonical JSON bytes representation
- Example:
>>> data = {"b": 2, "a": 1} >>> canonical_json(data) b'{"a":1,"b":2}'
- gunn.utils.chain_checksum(effect, prev_checksum=None)[ソース]¶
Generate hash chain checksum using SHA-256.
Creates a hash chain by combining the previous checksum with the canonical JSON representation of the current effect.
- Args:
effect: Effect dictionary to hash prev_checksum: Previous checksum in the chain (None for first entry)
- Returns:
Hexadecimal SHA-256 hash string
- Example:
>>> effect = {"kind": "test", "payload": {}} >>> chain_checksum(effect) 'a1b2c3...' >>> chain_checksum(effect, "prev_hash") 'd4e5f6...'
- gunn.utils.detect_corruption(entries)[ソース]¶
Detect corrupted entries in a hash chain.
Returns indices of entries that have invalid checksums, indicating potential corruption or tampering.
- Args:
entries: List of log entries with 'effect' and 'checksum' fields
- Returns:
List of indices where corruption was detected
- Example:
>>> entries = [ ... {"effect": {"kind": "test1"}, "checksum": "valid_hash"}, ... {"effect": {"kind": "test2"}, "checksum": "invalid_hash"} ... ] >>> detect_corruption(entries) [1]
- gunn.utils.validate_hash_chain(entries)[ソース]¶
Validate integrity of a hash chain sequence.
Verifies that each entry's checksum correctly follows from the previous entry's checksum and the entry's effect data.
- Args:
entries: List of log entries, each containing 'effect' and 'checksum' fields
- Returns:
True if chain is valid, False if corruption detected
- Example:
>>> entries = [ ... {"effect": {"kind": "test1"}, "checksum": "abc123"}, ... {"effect": {"kind": "test2"}, "checksum": "def456"} ... ] >>> validate_hash_chain(entries) True
- gunn.utils.verify_sequence_integrity(entries)[ソース]¶
Comprehensive integrity verification of a log sequence.
Performs multiple integrity checks including hash chain validation, sequence gap detection, and corruption analysis.
- Args:
entries: List of log entries with required fields
- Returns:
Dictionary containing integrity report with: - valid: Overall validity boolean - corrupted_entries: List of corrupted entry indices - missing_sequences: List of detected sequence gaps - total_entries: Total number of entries checked
- Example:
>>> entries = [{"effect": {...}, "checksum": "...", "global_seq": 1}] >>> verify_sequence_integrity(entries) {"valid": True, "corrupted_entries": [], "missing_sequences": [], "total_entries": 1}