"""Event log implementation for append-only storage with integrity checking.
This module provides the EventLog class that maintains an immutable sequence
of effects with hash chain integrity verification and replay capabilities.
"""
import asyncio
from typing import Any
from pydantic import BaseModel, Field
from gunn.schemas.types import Effect
from gunn.utils.hashing import chain_checksum
from gunn.utils.telemetry import (
MonotonicClock,
PerformanceTimer,
get_logger,
record_queue_depth,
)
[ドキュメント]
class EventLogEntry(BaseModel):
"""Single entry in the event log with integrity checking.
Each entry contains an effect along with metadata for ordering,
timing, and hash chain integrity verification.
"""
global_seq: int = Field(..., description="Monotonically increasing sequence number")
sim_time: float = Field(..., description="Simulation time when effect occurred")
wall_time: float = Field(..., description="Wall clock time when logged")
effect: Effect = Field(..., description="The effect that occurred")
source_metadata: dict[str, Any] = Field(
default_factory=dict, description="Additional metadata"
)
checksum: str = Field(..., description="Hash chain checksum for integrity")
req_id: str | None = Field(default=None, description="Request ID for idempotency")
[ドキュメント]
class EventLog:
"""Append-only event log with hash chain integrity.
Provides thread-safe append operations, replay capabilities, and
integrity verification through hash chaining.
Requirements addressed:
- 1.2: Record events in global sequential log with global_seq identifier
- 7.1: Maintain complete sequential log with global_seq numbers
- 7.3: Provide replay capabilities from event log
- 7.5: Validate state consistency using log hash/CRC and sequence gap detection
"""
def __init__(self, world_id: str = "default") -> None:
"""Initialize empty event log.
Args:
world_id: Identifier for the world this log belongs to
"""
self.world_id = world_id
self._entries: list[EventLogEntry] = []
self._seq_counter: int = 0
self._lock = asyncio.Lock()
self._logger = get_logger("gunn.event_log", world_id=world_id)
[ドキュメント]
async def append(self, effect: Effect, req_id: str | None = None) -> int:
"""Append effect to log with hash chain checksum.
Thread-safe operation that adds a new effect to the log with
proper sequencing and integrity checking.
Args:
effect: Effect to append to the log
req_id: Optional request ID for idempotency tracking
Returns:
The global_seq assigned to this entry
Raises:
ValueError: If effect is missing required fields
"""
if not effect.get("uuid"):
raise ValueError("Effect must have uuid field")
if not effect.get("kind"):
raise ValueError("Effect must have kind field")
with PerformanceTimer("event_log_append", record_metrics=True):
async with self._lock:
# Calculate next sequence number
self._seq_counter += 1
global_seq = self._seq_counter
# Update effect with sequence number if not already set
effect_dict = dict(effect) # Make a copy to avoid mutating input
if "global_seq" not in effect_dict or effect_dict["global_seq"] == 0:
effect_dict["global_seq"] = global_seq
# Get previous checksum for hash chain
prev_checksum = None
if self._entries:
prev_checksum = self._entries[-1].checksum
# Calculate checksum for this entry
checksum = chain_checksum(effect_dict, prev_checksum)
# Create log entry with world_id in source_metadata for multi-tenant auditing
source_metadata = {"world_id": self.world_id}
# Add additional metadata if available
payload = effect_dict.get("payload")
if isinstance(payload, dict):
if "adapter" in payload:
source_metadata["adapter"] = payload["adapter"]
if "priority" in payload:
source_metadata["priority"] = payload["priority"]
entry = EventLogEntry(
global_seq=global_seq,
sim_time=effect_dict.get("sim_time", MonotonicClock.now()), # type: ignore
wall_time=MonotonicClock.wall_time(),
effect=effect_dict, # type: ignore
source_metadata=source_metadata,
checksum=checksum,
req_id=req_id,
)
# Append to log
self._entries.append(entry)
# Update metrics
record_queue_depth("event_log", len(self._entries))
# Log the append operation
self._logger.info(
"Effect appended to log",
global_seq=global_seq,
effect_kind=effect_dict["kind"],
effect_uuid=effect_dict["uuid"],
req_id=req_id,
checksum=checksum[:8], # First 8 chars for readability
)
return global_seq
[ドキュメント]
def get_entries_since(self, since_seq: int) -> list[EventLogEntry]:
"""Get entries for replay and catch-up.
Returns all entries with global_seq > since_seq in order.
Args:
since_seq: Sequence number to start from (exclusive)
Returns:
List of entries after the specified sequence number
"""
with PerformanceTimer("event_log_get_entries", record_metrics=True):
result = []
for entry in self._entries:
if entry.global_seq > since_seq:
result.append(entry)
self._logger.debug(
"Retrieved entries for replay",
since_seq=since_seq,
num_entries=len(result),
latest_seq=self._entries[-1].global_seq if self._entries else 0,
)
return result
[ドキュメント]
def get_all_entries(self) -> list[EventLogEntry]:
"""Get all entries in the log.
Returns:
Complete list of all log entries
"""
return self._entries.copy()
[ドキュメント]
def get_latest_seq(self) -> int:
"""Get the latest sequence number in the log.
Returns:
Latest global_seq, or 0 if log is empty
"""
return self._seq_counter
[ドキュメント]
def get_entry_count(self) -> int:
"""Get total number of entries in the log.
Returns:
Number of entries in the log
"""
return len(self._entries)
[ドキュメント]
def validate_integrity(self) -> dict[str, Any]:
"""Validate complete log integrity.
Performs comprehensive integrity checks including:
- Hash chain validation
- Sequence gap detection
- Corruption analysis
Returns:
Dictionary with integrity report containing:
- valid: Overall validity boolean
- corrupted_entries: List of corrupted entry indices
- missing_sequences: List of detected sequence gaps
- total_entries: Total number of entries checked
- details: Additional diagnostic information
"""
with PerformanceTimer("event_log_validate", record_metrics=True):
corrupted_entries: list[int] = []
missing_sequences: list[int] = []
# Check hash chain integrity
prev_checksum = None
for i, entry in enumerate(self._entries):
expected_checksum = chain_checksum(dict(entry.effect), prev_checksum)
if expected_checksum != entry.checksum:
corrupted_entries.append(i)
prev_checksum = entry.checksum
# Check for sequence gaps
if self._entries:
expected_seq = 1
for entry in self._entries:
if entry.global_seq != expected_seq:
# Record all missing sequences
while expected_seq < entry.global_seq:
missing_sequences.append(expected_seq)
expected_seq += 1
expected_seq = entry.global_seq + 1
# Overall validity
is_valid = len(corrupted_entries) == 0 and len(missing_sequences) == 0
result = {
"valid": is_valid,
"corrupted_entries": corrupted_entries,
"missing_sequences": missing_sequences,
"total_entries": len(self._entries),
"details": {
"latest_seq": self._seq_counter,
"first_seq": self._entries[0].global_seq if self._entries else None,
"last_seq": self._entries[-1].global_seq if self._entries else None,
"world_id": self.world_id,
},
}
self._logger.info(
"Log integrity validation completed",
valid=is_valid,
corrupted_count=len(corrupted_entries),
missing_count=len(missing_sequences),
total_entries=len(self._entries),
)
return result
[ドキュメント]
def find_entry_by_uuid(self, effect_uuid: str) -> EventLogEntry | None:
"""Find entry by effect UUID.
Args:
effect_uuid: UUID of the effect to find
Returns:
EventLogEntry if found, None otherwise
"""
for entry in self._entries:
if entry.effect.get("uuid") == effect_uuid:
return entry
return None
[ドキュメント]
def find_entries_by_req_id(self, req_id: str) -> list[EventLogEntry]:
"""Find entries by request ID.
Args:
req_id: Request ID to search for
Returns:
List of entries with matching request ID
"""
result = []
for entry in self._entries:
if entry.req_id == req_id:
result.append(entry)
return result
[ドキュメント]
def get_entries_by_source(self, source_id: str) -> list[EventLogEntry]:
"""Get all entries from a specific source.
Args:
source_id: Source identifier to filter by
Returns:
List of entries from the specified source
"""
result = []
for entry in self._entries:
if entry.effect.get("source_id") == source_id:
result.append(entry)
return result
[ドキュメント]
def get_entries_in_time_range(
self,
start_time: float | None = None,
end_time: float | None = None,
use_sim_time: bool = True,
) -> list[EventLogEntry]:
"""Get entries within a time range.
Args:
start_time: Start time (inclusive), None for no lower bound
end_time: End time (inclusive), None for no upper bound
use_sim_time: If True, filter by sim_time; if False, use wall_time
Returns:
List of entries within the specified time range
"""
result = []
for entry in self._entries:
time_value = entry.sim_time if use_sim_time else entry.wall_time
if start_time is not None and time_value < start_time:
continue
if end_time is not None and time_value > end_time:
continue
result.append(entry)
return result
[ドキュメント]
async def compact(self, keep_entries: int = 1000) -> int:
"""Compact the log by removing old entries.
Keeps the most recent entries and removes older ones to manage memory.
This operation maintains integrity by preserving the hash chain.
Args:
keep_entries: Number of recent entries to keep
Returns:
Number of entries removed
"""
async with self._lock:
if len(self._entries) <= keep_entries:
return 0
entries_to_remove = len(self._entries) - keep_entries
self._entries = self._entries[entries_to_remove:]
self._logger.info(
"Log compacted",
removed_count=entries_to_remove,
remaining_count=len(self._entries),
oldest_remaining_seq=self._entries[0].global_seq
if self._entries
else None,
)
return entries_to_remove
[ドキュメント]
def get_stats(self) -> dict[str, Any]:
"""Get log statistics.
Returns:
Dictionary with log statistics including entry counts,
time ranges, and integrity status
"""
if not self._entries:
return {
"total_entries": 0,
"latest_seq": 0,
"time_range": None,
"world_id": self.world_id,
}
first_entry = self._entries[0]
last_entry = self._entries[-1]
return {
"total_entries": len(self._entries),
"latest_seq": self._seq_counter,
"seq_range": {
"first": first_entry.global_seq,
"last": last_entry.global_seq,
},
"time_range": {
"sim_time": {
"first": first_entry.sim_time,
"last": last_entry.sim_time,
"duration": last_entry.sim_time - first_entry.sim_time,
},
"wall_time": {
"first": first_entry.wall_time,
"last": last_entry.wall_time,
"duration": last_entry.wall_time - first_entry.wall_time,
},
},
"world_id": self.world_id,
}