srunx.monitor package#
Submodules#
srunx.monitor.base module#
Base monitor class for SLURM monitoring implementations.
- class srunx.monitor.base.BaseMonitor(config=None, callbacks=None)[source]#
Bases:
ABCAbstract base class for SLURM monitoring implementations.
Provides common functionality for polling, timeout handling, and signal management. Subclasses implement condition checking and state retrieval.
- __init__(config=None, callbacks=None)[source]#
Initialize monitor with configuration and callbacks.
- Parameters:
config (
MonitorConfig|None) – Monitoring configuration. Defaults to MonitorConfig() if None.callbacks (
list[Callback] |None) – List of notification callbacks. Defaults to empty list if None.
- Raises:
ValidationError – If config validation fails
- abstractmethod check_condition()[source]#
Check if monitoring condition is met.
Subclasses implement specific condition logic: - JobMonitor: Check if job reached target state - ResourceMonitor: Check if resource threshold met
- Return type:
bool- Returns:
True if condition met (monitoring should stop in until-mode) False if condition not yet met
- Raises:
SlurmError – If SLURM command fails
- abstractmethod get_current_state()[source]#
Get current monitoring state for comparison and logging.
Returns dictionary with state information for: - Duplicate notification prevention (continuous mode) - State change detection - Logging and debugging
- Returns:
JobMonitor: {“job_id”: int, “status”: JobStatus}
ResourceMonitor: {“partition”: str, “gpus_available”: int}
- Return type:
Dictionary with current state. Structure varies by subclass
- Raises:
SlurmError – If SLURM command fails
- watch_until()[source]#
Monitor until condition is met (blocking).
Polls at configured interval until: 1. check_condition() returns True -> success 2. Timeout reached -> TimeoutError 3. Signal received (Ctrl+C) -> graceful exit
- Raises:
TimeoutError – If timeout reached before condition met
SlurmError – If SLURM command fails repeatedly
- Return type:
None
- watch_continuous()[source]#
Monitor continuously until signal received (blocking).
Polls indefinitely and notifies on state changes: 1. Get current state 2. Compare with previous state 3. If different and notify_on_change: call callbacks 4. Sleep until next poll 5. Repeat until Ctrl+C or SIGTERM
Duplicate notifications prevented by state comparison.
- Raises:
SlurmError – If SLURM command fails repeatedly
- Return type:
None
srunx.monitor.job_monitor module#
Job monitoring implementation for SLURM.
- class srunx.monitor.job_monitor.JobMonitor(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#
Bases:
BaseMonitorMonitor SLURM jobs until they reach terminal states.
Polls jobs at configured intervals and notifies callbacks on state transitions. Supports monitoring single or multiple jobs with target status detection.
- __init__(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#
Initialize job monitor.
- Parameters:
job_ids (
list[int]) – List of SLURM job IDs to monitor.target_statuses (
list[JobStatus] |None) – Terminal statuses to wait for. Defaults to [COMPLETED, FAILED, CANCELLED, TIMEOUT].config (
MonitorConfig|None) – Monitoring configuration. Defaults to MonitorConfig() if None.callbacks (
list[Callback] |None) – List of notification callbacks. Defaults to empty list if None.client (
Slurm|None) – SLURM client instance. Defaults to Slurm() if None.
- Raises:
ValueError – If job_ids is empty.
srunx.monitor.report_types module#
Data types for scheduled reporting.
- class srunx.monitor.report_types.ReportConfig(schedule, include=<factory>, partition=None, user=None, timeframe='24h', daemon=True, max_jobs=10)[source]#
Bases:
objectConfiguration for scheduled reporting.
-
schedule:
str#
-
include:
list[str]#
-
partition:
str|None= None#
-
user:
str|None= None#
-
timeframe:
str= '24h'#
-
daemon:
bool= True#
-
max_jobs:
int= 10#
- __init__(schedule, include=<factory>, partition=None, user=None, timeframe='24h', daemon=True, max_jobs=10)#
-
schedule:
- class srunx.monitor.report_types.JobStats(pending, running, completed, failed, cancelled)[source]#
Bases:
objectJob queue statistics.
-
pending:
int#
-
running:
int#
-
completed:
int#
-
failed:
int#
-
cancelled:
int#
- property total_active: int#
- __init__(pending, running, completed, failed, cancelled)#
-
pending:
- class srunx.monitor.report_types.ResourceStats(partition, total_gpus, gpus_in_use, gpus_available, nodes_total, nodes_idle, nodes_down)[source]#
Bases:
objectGPU and node resource statistics.
-
partition:
str|None#
-
total_gpus:
int#
-
gpus_in_use:
int#
-
gpus_available:
int#
-
nodes_total:
int#
-
nodes_idle:
int#
-
nodes_down:
int#
- property utilization: float#
- __init__(partition, total_gpus, gpus_in_use, gpus_available, nodes_total, nodes_idle, nodes_down)#
-
partition:
- class srunx.monitor.report_types.RunningJob(job_id, name, user, status, partition, runtime, nodes, gpus)[source]#
Bases:
objectInformation about a running or pending job.
-
job_id:
int#
-
name:
str#
-
user:
str#
-
status:
str#
-
partition:
str|None#
-
runtime:
timedelta|None#
-
nodes:
int#
-
gpus:
int#
- __init__(job_id, name, user, status, partition, runtime, nodes, gpus)#
-
job_id:
- class srunx.monitor.report_types.Report(timestamp, job_stats=None, resource_stats=None, user_stats=None, running_jobs=<factory>)[source]#
Bases:
objectGenerated report containing requested statistics.
-
timestamp:
datetime#
-
resource_stats:
ResourceStats|None= None#
-
running_jobs:
list[RunningJob]#
- __init__(timestamp, job_stats=None, resource_stats=None, user_stats=None, running_jobs=<factory>)#
-
timestamp:
srunx.monitor.resource_monitor module#
Resource monitoring implementation for SLURM.
- class srunx.monitor.resource_monitor.ResourceMonitor(min_gpus, partition=None, config=None, callbacks=None)[source]#
Bases:
BaseMonitorMonitor SLURM GPU resources until availability threshold is met.
Polls partition resources at configured intervals and notifies callbacks when resources become available or exhausted.
- __init__(min_gpus, partition=None, config=None, callbacks=None)[source]#
Initialize resource monitor.
- Parameters:
min_gpus (
int) – Minimum number of GPUs required for threshold.partition (
str|None) – SLURM partition to monitor. Defaults to all partitions if None.config (
MonitorConfig|None) – Monitoring configuration. Defaults to MonitorConfig() if None.callbacks (
list[Callback] |None) – List of notification callbacks. Defaults to empty list if None.
- Raises:
ValueError – If min_gpus < 0.
- check_condition()[source]#
Check if resource availability threshold is met.
- Return type:
bool- Returns:
True if available GPUs >= min_gpus threshold, False otherwise.
- Raises:
SlurmError – If SLURM command fails.
- get_current_state()[source]#
Get current resource state for comparison and logging.
- Return type:
dict[str,Any]- Returns:
Dictionary with current resource state. Format: {
”partition”: str | None, “gpus_available”: int, “gpus_total”: int, “meets_threshold”: bool
}
- Raises:
SlurmError – If SLURM command fails.
- get_partition_resources()[source]#
Query SLURM for GPU resource availability.
Uses sinfo to get total GPUs per partition and squeue to get GPUs in use. Filters out DOWN/DRAIN/DRAINING nodes from availability calculation.
- Return type:
- Returns:
ResourceSnapshot with current resource state.
- Raises:
SlurmError – If SLURM command fails.
srunx.monitor.scheduler module#
Scheduled reporter for periodic SLURM status updates.
- class srunx.monitor.scheduler.ScheduledReporter(client, callback, config)[source]#
Bases:
objectScheduled reporter for periodic SLURM cluster status updates.
Generates and sends periodic reports containing job queue statistics, resource availability, and user-specific job information to configured callbacks (e.g., Slack webhooks).
- Parameters:
client (
Slurm) – SLURM client for job operationscallback (
Callback) – Callback for report deliveryconfig (
ReportConfig) – Report configuration
Example
>>> from srunx import Slurm >>> from srunx.callbacks import SlackCallback >>> from srunx.monitor.scheduler import ScheduledReporter >>> from srunx.monitor.report_types import ReportConfig >>> >>> client = Slurm() >>> callback = SlackCallback(webhook_url) >>> config = ReportConfig(schedule="1h", include=["jobs", "resources"]) >>> >>> reporter = ScheduledReporter(client, callback, config) >>> reporter.run() # Blocking execution
srunx.monitor.types module#
Data models and types for SLURM monitoring.
- class srunx.monitor.types.WatchMode(*values)[source]#
Bases:
StrEnumMonitoring mode enumeration.
- UNTIL_CONDITION = 'until'#
Monitor until condition is met, then exit
- CONTINUOUS = 'continuous'#
Monitor indefinitely, notify on every state change
- class srunx.monitor.types.MonitorConfig(**data)[source]#
Bases:
BaseModelConfiguration for monitoring operations.
-
poll_interval:
int#
-
timeout:
int|None#
-
notify_on_change:
bool#
- property is_aggressive: bool#
Check if polling interval is aggressive (<5 seconds).
- class Config[source]#
Bases:
object- json_schema_extra = {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}#
- model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
poll_interval:
- class srunx.monitor.types.ResourceSnapshot(**data)[source]#
Bases:
BaseModelPoint-in-time snapshot of SLURM partition resources.
-
timestamp:
datetime#
-
partition:
str|None#
-
total_gpus:
int#
-
gpus_in_use:
int#
-
gpus_available:
int#
-
jobs_running:
int#
-
nodes_total:
int#
-
nodes_idle:
int#
-
nodes_down:
int#
- property gpu_utilization: float#
GPU utilization percentage (0.0 to 1.0).
- property has_available_gpus: bool#
Check if any GPUs are available.
- meets_threshold(min_gpus)[source]#
Check if available GPUs meet minimum threshold.
- Parameters:
min_gpus (
int) – Minimum required GPUs- Return type:
bool- Returns:
True if gpus_available >= min_gpus
- class Config[source]#
Bases:
object- json_schema_extra = {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}#
- model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
timestamp:
Module contents#
SLURM monitoring module.
This module provides job and resource monitoring capabilities for SLURM clusters, including configurable polling, Slack notifications, and both until-condition and continuous monitoring modes.
- class srunx.monitor.BaseMonitor(config=None, callbacks=None)[source]#
Bases:
ABCAbstract base class for SLURM monitoring implementations.
Provides common functionality for polling, timeout handling, and signal management. Subclasses implement condition checking and state retrieval.
- __init__(config=None, callbacks=None)[source]#
Initialize monitor with configuration and callbacks.
- Parameters:
config (
MonitorConfig|None) – Monitoring configuration. Defaults to MonitorConfig() if None.callbacks (
list[Callback] |None) – List of notification callbacks. Defaults to empty list if None.
- Raises:
ValidationError – If config validation fails
- abstractmethod check_condition()[source]#
Check if monitoring condition is met.
Subclasses implement specific condition logic: - JobMonitor: Check if job reached target state - ResourceMonitor: Check if resource threshold met
- Return type:
bool- Returns:
True if condition met (monitoring should stop in until-mode) False if condition not yet met
- Raises:
SlurmError – If SLURM command fails
- abstractmethod get_current_state()[source]#
Get current monitoring state for comparison and logging.
Returns dictionary with state information for: - Duplicate notification prevention (continuous mode) - State change detection - Logging and debugging
- Returns:
JobMonitor: {“job_id”: int, “status”: JobStatus}
ResourceMonitor: {“partition”: str, “gpus_available”: int}
- Return type:
Dictionary with current state. Structure varies by subclass
- Raises:
SlurmError – If SLURM command fails
- watch_continuous()[source]#
Monitor continuously until signal received (blocking).
Polls indefinitely and notifies on state changes: 1. Get current state 2. Compare with previous state 3. If different and notify_on_change: call callbacks 4. Sleep until next poll 5. Repeat until Ctrl+C or SIGTERM
Duplicate notifications prevented by state comparison.
- Raises:
SlurmError – If SLURM command fails repeatedly
- Return type:
None
- watch_until()[source]#
Monitor until condition is met (blocking).
Polls at configured interval until: 1. check_condition() returns True -> success 2. Timeout reached -> TimeoutError 3. Signal received (Ctrl+C) -> graceful exit
- Raises:
TimeoutError – If timeout reached before condition met
SlurmError – If SLURM command fails repeatedly
- Return type:
None
- class srunx.monitor.JobMonitor(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#
Bases:
BaseMonitorMonitor SLURM jobs until they reach terminal states.
Polls jobs at configured intervals and notifies callbacks on state transitions. Supports monitoring single or multiple jobs with target status detection.
- __init__(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#
Initialize job monitor.
- Parameters:
job_ids (
list[int]) – List of SLURM job IDs to monitor.target_statuses (
list[JobStatus] |None) – Terminal statuses to wait for. Defaults to [COMPLETED, FAILED, CANCELLED, TIMEOUT].config (
MonitorConfig|None) – Monitoring configuration. Defaults to MonitorConfig() if None.callbacks (
list[Callback] |None) – List of notification callbacks. Defaults to empty list if None.client (
Slurm|None) – SLURM client instance. Defaults to Slurm() if None.
- Raises:
ValueError – If job_ids is empty.
- class srunx.monitor.ResourceMonitor(min_gpus, partition=None, config=None, callbacks=None)[source]#
Bases:
BaseMonitorMonitor SLURM GPU resources until availability threshold is met.
Polls partition resources at configured intervals and notifies callbacks when resources become available or exhausted.
- __init__(min_gpus, partition=None, config=None, callbacks=None)[source]#
Initialize resource monitor.
- Parameters:
min_gpus (
int) – Minimum number of GPUs required for threshold.partition (
str|None) – SLURM partition to monitor. Defaults to all partitions if None.config (
MonitorConfig|None) – Monitoring configuration. Defaults to MonitorConfig() if None.callbacks (
list[Callback] |None) – List of notification callbacks. Defaults to empty list if None.
- Raises:
ValueError – If min_gpus < 0.
- check_condition()[source]#
Check if resource availability threshold is met.
- Return type:
bool- Returns:
True if available GPUs >= min_gpus threshold, False otherwise.
- Raises:
SlurmError – If SLURM command fails.
- get_current_state()[source]#
Get current resource state for comparison and logging.
- Return type:
dict[str,Any]- Returns:
Dictionary with current resource state. Format: {
”partition”: str | None, “gpus_available”: int, “gpus_total”: int, “meets_threshold”: bool
}
- Raises:
SlurmError – If SLURM command fails.
- get_partition_resources()[source]#
Query SLURM for GPU resource availability.
Uses sinfo to get total GPUs per partition and squeue to get GPUs in use. Filters out DOWN/DRAIN/DRAINING nodes from availability calculation.
- Return type:
- Returns:
ResourceSnapshot with current resource state.
- Raises:
SlurmError – If SLURM command fails.
- class srunx.monitor.MonitorConfig(**data)[source]#
Bases:
BaseModelConfiguration for monitoring operations.
- class Config[source]#
Bases:
object- json_schema_extra = {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}#
- property is_aggressive: bool#
Check if polling interval is aggressive (<5 seconds).
- model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
poll_interval:
int#
-
timeout:
int|None#
-
notify_on_change:
bool#
- class srunx.monitor.ResourceSnapshot(**data)[source]#
Bases:
BaseModelPoint-in-time snapshot of SLURM partition resources.
- class Config[source]#
Bases:
object- json_schema_extra = {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}#
- property gpu_utilization: float#
GPU utilization percentage (0.0 to 1.0).
- property has_available_gpus: bool#
Check if any GPUs are available.
- meets_threshold(min_gpus)[source]#
Check if available GPUs meet minimum threshold.
- Parameters:
min_gpus (
int) – Minimum required GPUs- Return type:
bool- Returns:
True if gpus_available >= min_gpus
- model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
timestamp:
datetime#
-
partition:
str|None#
-
total_gpus:
int#
-
gpus_in_use:
int#
-
gpus_available:
int#
-
jobs_running:
int#
-
nodes_total:
int#
-
nodes_idle:
int#
-
nodes_down:
int#