srunx.monitor package

Contents

srunx.monitor package#

Submodules#

srunx.monitor.base module#

Base monitor class for SLURM monitoring implementations.

class srunx.monitor.base.BaseMonitor(config=None, callbacks=None)[source]#

Bases: ABC

Abstract base class for SLURM monitoring implementations.

Provides common functionality for polling, timeout handling, and signal management. Subclasses implement condition checking and state retrieval.

__init__(config=None, callbacks=None)[source]#

Initialize monitor with configuration and callbacks.

Parameters:
  • config (MonitorConfig | None) – Monitoring configuration. Defaults to MonitorConfig() if None.

  • callbacks (list[Callback] | None) – List of notification callbacks. Defaults to empty list if None.

Raises:

ValidationError – If config validation fails

abstractmethod check_condition()[source]#

Check if monitoring condition is met.

Subclasses implement specific condition logic: - JobMonitor: Check if job reached target state - ResourceMonitor: Check if resource threshold met

Return type:

bool

Returns:

True if condition met (monitoring should stop in until-mode) False if condition not yet met

Raises:

SlurmError – If SLURM command fails

abstractmethod get_current_state()[source]#

Get current monitoring state for comparison and logging.

Returns dictionary with state information for: - Duplicate notification prevention (continuous mode) - State change detection - Logging and debugging

Returns:

  • JobMonitor: {“job_id”: int, “status”: JobStatus}

  • ResourceMonitor: {“partition”: str, “gpus_available”: int}

Return type:

Dictionary with current state. Structure varies by subclass

Raises:

SlurmError – If SLURM command fails

watch_until()[source]#

Monitor until condition is met (blocking).

Polls at configured interval until: 1. check_condition() returns True -> success 2. Timeout reached -> TimeoutError 3. Signal received (Ctrl+C) -> graceful exit

Raises:
  • TimeoutError – If timeout reached before condition met

  • SlurmError – If SLURM command fails repeatedly

Return type:

None

watch_continuous()[source]#

Monitor continuously until signal received (blocking).

Polls indefinitely and notifies on state changes: 1. Get current state 2. Compare with previous state 3. If different and notify_on_change: call callbacks 4. Sleep until next poll 5. Repeat until Ctrl+C or SIGTERM

Duplicate notifications prevented by state comparison.

Raises:

SlurmError – If SLURM command fails repeatedly

Return type:

None

srunx.monitor.job_monitor module#

Job monitoring implementation for SLURM.

class srunx.monitor.job_monitor.JobMonitor(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#

Bases: BaseMonitor

Monitor SLURM jobs until they reach terminal states.

Polls jobs at configured intervals and notifies callbacks on state transitions. Supports monitoring single or multiple jobs with target status detection.

__init__(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#

Initialize job monitor.

Parameters:
  • job_ids (list[int]) – List of SLURM job IDs to monitor.

  • target_statuses (list[JobStatus] | None) – Terminal statuses to wait for. Defaults to [COMPLETED, FAILED, CANCELLED, TIMEOUT].

  • config (MonitorConfig | None) – Monitoring configuration. Defaults to MonitorConfig() if None.

  • callbacks (list[Callback] | None) – List of notification callbacks. Defaults to empty list if None.

  • client (Slurm | None) – SLURM client instance. Defaults to Slurm() if None.

Raises:

ValueError – If job_ids is empty.

check_condition()[source]#

Check if all monitored jobs have reached target statuses.

Return type:

bool

Returns:

True if all jobs have reached a target status, False otherwise.

Raises:

SlurmError – If SLURM command fails.

get_current_state()[source]#

Get current state of all monitored jobs.

Return type:

dict[str, Any]

Returns:

Dictionary mapping job IDs (as strings) to their current statuses. Format: {str(job_id): status_value, …}

Raises:

SlurmError – If SLURM command fails.

srunx.monitor.report_types module#

Data types for scheduled reporting.

class srunx.monitor.report_types.ReportConfig(schedule, include=<factory>, partition=None, user=None, timeframe='24h', daemon=True, max_jobs=10)[source]#

Bases: object

Configuration for scheduled reporting.

schedule: str#
include: list[str]#
partition: str | None = None#
user: str | None = None#
timeframe: str = '24h'#
daemon: bool = True#
max_jobs: int = 10#
__post_init__()[source]#

Validate configuration.

Return type:

None

is_cron_format()[source]#

Check if schedule is in cron format.

Return type:

bool

__init__(schedule, include=<factory>, partition=None, user=None, timeframe='24h', daemon=True, max_jobs=10)#
class srunx.monitor.report_types.JobStats(pending, running, completed, failed, cancelled)[source]#

Bases: object

Job queue statistics.

pending: int#
running: int#
completed: int#
failed: int#
cancelled: int#
property total_active: int#
__init__(pending, running, completed, failed, cancelled)#
class srunx.monitor.report_types.ResourceStats(partition, total_gpus, gpus_in_use, gpus_available, nodes_total, nodes_idle, nodes_down)[source]#

Bases: object

GPU and node resource statistics.

partition: str | None#
total_gpus: int#
gpus_in_use: int#
gpus_available: int#
nodes_total: int#
nodes_idle: int#
nodes_down: int#
property utilization: float#
__init__(partition, total_gpus, gpus_in_use, gpus_available, nodes_total, nodes_idle, nodes_down)#
class srunx.monitor.report_types.RunningJob(job_id, name, user, status, partition, runtime, nodes, gpus)[source]#

Bases: object

Information about a running or pending job.

job_id: int#
name: str#
user: str#
status: str#
partition: str | None#
runtime: timedelta | None#
nodes: int#
gpus: int#
__init__(job_id, name, user, status, partition, runtime, nodes, gpus)#
class srunx.monitor.report_types.Report(timestamp, job_stats=None, resource_stats=None, user_stats=None, running_jobs=<factory>)[source]#

Bases: object

Generated report containing requested statistics.

timestamp: datetime#
job_stats: JobStats | None = None#
resource_stats: ResourceStats | None = None#
user_stats: JobStats | None = None#
running_jobs: list[RunningJob]#
__init__(timestamp, job_stats=None, resource_stats=None, user_stats=None, running_jobs=<factory>)#

srunx.monitor.resource_monitor module#

Resource monitoring implementation for SLURM.

class srunx.monitor.resource_monitor.ResourceMonitor(min_gpus, partition=None, config=None, callbacks=None)[source]#

Bases: BaseMonitor

Monitor SLURM GPU resources until availability threshold is met.

Polls partition resources at configured intervals and notifies callbacks when resources become available or exhausted.

__init__(min_gpus, partition=None, config=None, callbacks=None)[source]#

Initialize resource monitor.

Parameters:
  • min_gpus (int) – Minimum number of GPUs required for threshold.

  • partition (str | None) – SLURM partition to monitor. Defaults to all partitions if None.

  • config (MonitorConfig | None) – Monitoring configuration. Defaults to MonitorConfig() if None.

  • callbacks (list[Callback] | None) – List of notification callbacks. Defaults to empty list if None.

Raises:

ValueError – If min_gpus < 0.

check_condition()[source]#

Check if resource availability threshold is met.

Return type:

bool

Returns:

True if available GPUs >= min_gpus threshold, False otherwise.

Raises:

SlurmError – If SLURM command fails.

get_current_state()[source]#

Get current resource state for comparison and logging.

Return type:

dict[str, Any]

Returns:

Dictionary with current resource state. Format: {

”partition”: str | None, “gpus_available”: int, “gpus_total”: int, “meets_threshold”: bool

}

Raises:

SlurmError – If SLURM command fails.

get_partition_resources()[source]#

Query SLURM for GPU resource availability.

Uses sinfo to get total GPUs per partition and squeue to get GPUs in use. Filters out DOWN/DRAIN/DRAINING nodes from availability calculation.

Return type:

ResourceSnapshot

Returns:

ResourceSnapshot with current resource state.

Raises:

SlurmError – If SLURM command fails.

srunx.monitor.scheduler module#

Scheduled reporter for periodic SLURM status updates.

class srunx.monitor.scheduler.ScheduledReporter(client, callback, config)[source]#

Bases: object

Scheduled reporter for periodic SLURM cluster status updates.

Generates and sends periodic reports containing job queue statistics, resource availability, and user-specific job information to configured callbacks (e.g., Slack webhooks).

Parameters:
  • client (Slurm) – SLURM client for job operations

  • callback (Callback) – Callback for report delivery

  • config (ReportConfig) – Report configuration

Example

>>> from srunx import Slurm
>>> from srunx.callbacks import SlackCallback
>>> from srunx.monitor.scheduler import ScheduledReporter
>>> from srunx.monitor.report_types import ReportConfig
>>>
>>> client = Slurm()
>>> callback = SlackCallback(webhook_url)
>>> config = ReportConfig(schedule="1h", include=["jobs", "resources"])
>>>
>>> reporter = ScheduledReporter(client, callback, config)
>>> reporter.run()  # Blocking execution
__init__(client, callback, config)[source]#

Initialize scheduled reporter.

run()[source]#

Start scheduler in blocking mode.

Runs until interrupted by SIGINT or SIGTERM.

Return type:

None

stop()[source]#

Stop the scheduler gracefully.

Return type:

None

srunx.monitor.types module#

Data models and types for SLURM monitoring.

class srunx.monitor.types.WatchMode(*values)[source]#

Bases: StrEnum

Monitoring mode enumeration.

UNTIL_CONDITION = 'until'#

Monitor until condition is met, then exit

CONTINUOUS = 'continuous'#

Monitor indefinitely, notify on every state change

class srunx.monitor.types.MonitorConfig(**data)[source]#

Bases: BaseModel

Configuration for monitoring operations.

poll_interval: int#
timeout: int | None#
mode: WatchMode#
notify_on_change: bool#
property is_aggressive: bool#

Check if polling interval is aggressive (<5 seconds).

class Config[source]#

Bases: object

json_schema_extra = {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}#
model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class srunx.monitor.types.ResourceSnapshot(**data)[source]#

Bases: BaseModel

Point-in-time snapshot of SLURM partition resources.

timestamp: datetime#
partition: str | None#
total_gpus: int#
gpus_in_use: int#
gpus_available: int#
jobs_running: int#
nodes_total: int#
nodes_idle: int#
nodes_down: int#
property gpu_utilization: float#

GPU utilization percentage (0.0 to 1.0).

property has_available_gpus: bool#

Check if any GPUs are available.

meets_threshold(min_gpus)[source]#

Check if available GPUs meet minimum threshold.

Parameters:

min_gpus (int) – Minimum required GPUs

Return type:

bool

Returns:

True if gpus_available >= min_gpus

class Config[source]#

Bases: object

json_schema_extra = {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}#
model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Module contents#

SLURM monitoring module.

This module provides job and resource monitoring capabilities for SLURM clusters, including configurable polling, Slack notifications, and both until-condition and continuous monitoring modes.

class srunx.monitor.BaseMonitor(config=None, callbacks=None)[source]#

Bases: ABC

Abstract base class for SLURM monitoring implementations.

Provides common functionality for polling, timeout handling, and signal management. Subclasses implement condition checking and state retrieval.

__init__(config=None, callbacks=None)[source]#

Initialize monitor with configuration and callbacks.

Parameters:
  • config (MonitorConfig | None) – Monitoring configuration. Defaults to MonitorConfig() if None.

  • callbacks (list[Callback] | None) – List of notification callbacks. Defaults to empty list if None.

Raises:

ValidationError – If config validation fails

abstractmethod check_condition()[source]#

Check if monitoring condition is met.

Subclasses implement specific condition logic: - JobMonitor: Check if job reached target state - ResourceMonitor: Check if resource threshold met

Return type:

bool

Returns:

True if condition met (monitoring should stop in until-mode) False if condition not yet met

Raises:

SlurmError – If SLURM command fails

abstractmethod get_current_state()[source]#

Get current monitoring state for comparison and logging.

Returns dictionary with state information for: - Duplicate notification prevention (continuous mode) - State change detection - Logging and debugging

Returns:

  • JobMonitor: {“job_id”: int, “status”: JobStatus}

  • ResourceMonitor: {“partition”: str, “gpus_available”: int}

Return type:

Dictionary with current state. Structure varies by subclass

Raises:

SlurmError – If SLURM command fails

watch_continuous()[source]#

Monitor continuously until signal received (blocking).

Polls indefinitely and notifies on state changes: 1. Get current state 2. Compare with previous state 3. If different and notify_on_change: call callbacks 4. Sleep until next poll 5. Repeat until Ctrl+C or SIGTERM

Duplicate notifications prevented by state comparison.

Raises:

SlurmError – If SLURM command fails repeatedly

Return type:

None

watch_until()[source]#

Monitor until condition is met (blocking).

Polls at configured interval until: 1. check_condition() returns True -> success 2. Timeout reached -> TimeoutError 3. Signal received (Ctrl+C) -> graceful exit

Raises:
  • TimeoutError – If timeout reached before condition met

  • SlurmError – If SLURM command fails repeatedly

Return type:

None

class srunx.monitor.JobMonitor(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#

Bases: BaseMonitor

Monitor SLURM jobs until they reach terminal states.

Polls jobs at configured intervals and notifies callbacks on state transitions. Supports monitoring single or multiple jobs with target status detection.

__init__(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#

Initialize job monitor.

Parameters:
  • job_ids (list[int]) – List of SLURM job IDs to monitor.

  • target_statuses (list[JobStatus] | None) – Terminal statuses to wait for. Defaults to [COMPLETED, FAILED, CANCELLED, TIMEOUT].

  • config (MonitorConfig | None) – Monitoring configuration. Defaults to MonitorConfig() if None.

  • callbacks (list[Callback] | None) – List of notification callbacks. Defaults to empty list if None.

  • client (Slurm | None) – SLURM client instance. Defaults to Slurm() if None.

Raises:

ValueError – If job_ids is empty.

check_condition()[source]#

Check if all monitored jobs have reached target statuses.

Return type:

bool

Returns:

True if all jobs have reached a target status, False otherwise.

Raises:

SlurmError – If SLURM command fails.

get_current_state()[source]#

Get current state of all monitored jobs.

Return type:

dict[str, Any]

Returns:

Dictionary mapping job IDs (as strings) to their current statuses. Format: {str(job_id): status_value, …}

Raises:

SlurmError – If SLURM command fails.

class srunx.monitor.ResourceMonitor(min_gpus, partition=None, config=None, callbacks=None)[source]#

Bases: BaseMonitor

Monitor SLURM GPU resources until availability threshold is met.

Polls partition resources at configured intervals and notifies callbacks when resources become available or exhausted.

__init__(min_gpus, partition=None, config=None, callbacks=None)[source]#

Initialize resource monitor.

Parameters:
  • min_gpus (int) – Minimum number of GPUs required for threshold.

  • partition (str | None) – SLURM partition to monitor. Defaults to all partitions if None.

  • config (MonitorConfig | None) – Monitoring configuration. Defaults to MonitorConfig() if None.

  • callbacks (list[Callback] | None) – List of notification callbacks. Defaults to empty list if None.

Raises:

ValueError – If min_gpus < 0.

check_condition()[source]#

Check if resource availability threshold is met.

Return type:

bool

Returns:

True if available GPUs >= min_gpus threshold, False otherwise.

Raises:

SlurmError – If SLURM command fails.

get_current_state()[source]#

Get current resource state for comparison and logging.

Return type:

dict[str, Any]

Returns:

Dictionary with current resource state. Format: {

”partition”: str | None, “gpus_available”: int, “gpus_total”: int, “meets_threshold”: bool

}

Raises:

SlurmError – If SLURM command fails.

get_partition_resources()[source]#

Query SLURM for GPU resource availability.

Uses sinfo to get total GPUs per partition and squeue to get GPUs in use. Filters out DOWN/DRAIN/DRAINING nodes from availability calculation.

Return type:

ResourceSnapshot

Returns:

ResourceSnapshot with current resource state.

Raises:

SlurmError – If SLURM command fails.

class srunx.monitor.MonitorConfig(**data)[source]#

Bases: BaseModel

Configuration for monitoring operations.

class Config[source]#

Bases: object

json_schema_extra = {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}#
property is_aggressive: bool#

Check if polling interval is aggressive (<5 seconds).

model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

poll_interval: int#
timeout: int | None#
mode: WatchMode#
notify_on_change: bool#
class srunx.monitor.ResourceSnapshot(**data)[source]#

Bases: BaseModel

Point-in-time snapshot of SLURM partition resources.

class Config[source]#

Bases: object

json_schema_extra = {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}#
property gpu_utilization: float#

GPU utilization percentage (0.0 to 1.0).

property has_available_gpus: bool#

Check if any GPUs are available.

meets_threshold(min_gpus)[source]#

Check if available GPUs meet minimum threshold.

Parameters:

min_gpus (int) – Minimum required GPUs

Return type:

bool

Returns:

True if gpus_available >= min_gpus

model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

timestamp: datetime#
partition: str | None#
total_gpus: int#
gpus_in_use: int#
gpus_available: int#
jobs_running: int#
nodes_total: int#
nodes_idle: int#
nodes_down: int#
class srunx.monitor.WatchMode(*values)[source]#

Bases: StrEnum

Monitoring mode enumeration.

UNTIL_CONDITION = 'until'#

Monitor until condition is met, then exit

CONTINUOUS = 'continuous'#

Monitor indefinitely, notify on every state change