srunx package#
Subpackages#
- srunx.cli package
- srunx.containers package
- srunx.monitor package
- Submodules
- srunx.monitor.base module
- srunx.monitor.job_monitor module
- srunx.monitor.report_types module
- srunx.monitor.resource_monitor module
- srunx.monitor.scheduler module
- srunx.monitor.types module
WatchModeMonitorConfigResourceSnapshotResourceSnapshot.timestampResourceSnapshot.partitionResourceSnapshot.total_gpusResourceSnapshot.gpus_in_useResourceSnapshot.gpus_availableResourceSnapshot.jobs_runningResourceSnapshot.nodes_totalResourceSnapshot.nodes_idleResourceSnapshot.nodes_downResourceSnapshot.gpu_utilizationResourceSnapshot.has_available_gpusResourceSnapshot.meets_threshold()ResourceSnapshot.ConfigResourceSnapshot.model_config
- Module contents
BaseMonitorJobMonitorResourceMonitorMonitorConfigResourceSnapshotResourceSnapshot.ConfigResourceSnapshot.gpu_utilizationResourceSnapshot.has_available_gpusResourceSnapshot.meets_threshold()ResourceSnapshot.model_configResourceSnapshot.timestampResourceSnapshot.partitionResourceSnapshot.total_gpusResourceSnapshot.gpus_in_useResourceSnapshot.gpus_availableResourceSnapshot.jobs_runningResourceSnapshot.nodes_totalResourceSnapshot.nodes_idleResourceSnapshot.nodes_down
WatchMode
- srunx.ssh package
- Subpackages
- Submodules
- srunx.ssh.example module
- Module contents
SSHSlurmClientSSHSlurmClient.__init__()SSHSlurmClient.cleanup_file()SSHSlurmClient.cleanup_job_files()SSHSlurmClient.connect()SSHSlurmClient.disconnect()SSHSlurmClient.execute_command()SSHSlurmClient.file_exists()SSHSlurmClient.get_job_output()SSHSlurmClient.get_job_output_detailed()SSHSlurmClient.get_job_status()SSHSlurmClient.monitor_job()SSHSlurmClient.submit_sbatch_file()SSHSlurmClient.submit_sbatch_job()SSHSlurmClient.sync_project()SSHSlurmClient.tail_log()SSHSlurmClient.test_connection()SSHSlurmClient.upload_file()SSHSlurmClient.validate_remote_script()
SlurmJob
- srunx.sync package
- srunx.web package
Submodules#
srunx.callbacks module#
Callback system for job state notifications.
- class srunx.callbacks.Callback[source]#
Bases:
objectBase callback class for job state notifications.
- on_workflow_started(workflow)[source]#
Called when a workflow starts.
- Parameters:
workflow (
Workflow) – Workflow that started.- Return type:
None
- on_workflow_completed(workflow)[source]#
Called when a workflow completes.
- Parameters:
workflow (
Workflow) – Workflow that completed.- Return type:
None
- on_resources_available(snapshot)[source]#
Called when resources become available (threshold met).
- Parameters:
snapshot (
ResourceSnapshot) – Resource snapshot at the time resources became available.- Return type:
None
- on_resources_exhausted(snapshot)[source]#
Called when resources are exhausted (below threshold).
- Parameters:
snapshot (
ResourceSnapshot) – Resource snapshot at the time resources were exhausted.- Return type:
None
- class srunx.callbacks.SlackCallback(webhook_url)[source]#
Bases:
CallbackCallback that sends notifications to Slack via webhook.
- __init__(webhook_url)[source]#
Initialize Slack callback.
- Parameters:
webhook_url (
str) – Slack webhook URL for sending notifications.- Raises:
ValueError – If webhook_url is not a valid Slack webhook URL.
- on_workflow_completed(workflow)[source]#
Send completion notification to Slack.
- Parameters:
workflow (
Workflow) – Workflow that completed.- Return type:
None
- on_resources_available(snapshot)[source]#
Send resource availability notification to Slack.
- Parameters:
snapshot (
ResourceSnapshot) – Resource snapshot at the time resources became available.- Return type:
None
- on_resources_exhausted(snapshot)[source]#
Send resource exhaustion notification to Slack.
- Parameters:
snapshot (
ResourceSnapshot) – Resource snapshot at the time resources were exhausted.- Return type:
None
srunx.client module#
SLURM client for job submission and management.
- class srunx.client.Slurm(default_template=None, callbacks=None)[source]#
Bases:
objectClient for interacting with SLURM workload manager.
- __init__(default_template=None, callbacks=None)[source]#
Initialize SLURM client.
- Parameters:
default_template (
str|None) – Path to default job template.callbacks (
Sequence[Callback] |None) – List of callbacks.
- submit(job, template_path=None, callbacks=None, verbose=False, record_history=True, workflow_name=None)[source]#
Submit a job to SLURM.
- Parameters:
template_path (
str|None) – Optional template path (uses default if not provided).callbacks (
Sequence[Callback] |None) – List of callbacks.verbose (
bool) – Whether to print the rendered content.record_history (
bool) – Whether to record job in history database.workflow_name (
str|None) – Name of the workflow if part of a workflow.
- Return type:
- Returns:
Job instance with updated job_id and status.
- Raises:
subprocess.CalledProcessError – If job submission fails.
- static retrieve(job_id)[source]#
Retrieve job information from SLURM.
- Parameters:
job_id (
int) – SLURM job ID.- Return type:
- Returns:
Job object with current status.
- cancel(job_id)[source]#
Cancel a SLURM job.
- Parameters:
job_id (
int) – SLURM job ID to cancel.- Raises:
subprocess.CalledProcessError – If job cancellation fails.
- Return type:
None
- queue(user=None)[source]#
List jobs for a user.
- Parameters:
user (
str|None) – Username (defaults to current user).- Return type:
list[BaseJob]- Returns:
List of Job objects.
- run(job, template_path=None, callbacks=None, poll_interval=5, verbose=False, workflow_name=None)[source]#
Submit a job and wait for completion.
- get_job_output(job_id, job_name=None)[source]#
Get job output from SLURM log files.
- Parameters:
job_id (
int|str) – SLURM job IDjob_name (
str|None) – Job name for better log file detection
- Return type:
tuple[str,str]- Returns:
Tuple of (output_content, error_content)
- get_job_output_detailed(job_id, job_name=None, skip_content=False)[source]#
Get detailed job output information including found log files.
- Parameters:
job_id (
int|str) – SLURM job IDjob_name (
str|None) – Job name for better log file detectionskip_content (
bool) – If True, only find log files without reading content
- Return type:
dict[str,str|list[str] |None]- Returns:
Dictionary with detailed log information
- tail_log(job_id, job_name=None, follow=False, last_n=None, poll_interval=1.0)[source]#
Display job logs with optional real-time streaming.
- Parameters:
job_id (
int|str) – SLURM job IDjob_name (
str|None) – Job name for better log file detectionfollow (
bool) – If True, continuously stream new log lines (like tail -f)last_n (
int|None) – Show only the last N linespoll_interval (
float) – Polling interval in seconds for follow mode
- Return type:
None
- srunx.client.submit_job(job, template_path=None, callbacks=None, verbose=False)[source]#
Submit a job to SLURM (convenience function).
srunx.config module#
Configuration management for srunx.
- class srunx.config.ResourceDefaults(**data)[source]#
Bases:
BaseModelDefault resource configuration.
-
nodes:
int#
-
gpus_per_node:
int#
-
ntasks_per_node:
int#
-
cpus_per_task:
int#
-
memory_per_node:
str|None#
-
time_limit:
str|None#
-
nodelist:
str|None#
-
partition:
str|None#
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
nodes:
- class srunx.config.EnvironmentDefaults(**data)[source]#
Bases:
BaseModelDefault environment configuration.
-
conda:
str|None#
-
venv:
str|None#
-
container:
ContainerResource|None#
-
env_vars:
dict[str,str]#
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
conda:
- class srunx.config.NotificationConfig(**data)[source]#
Bases:
BaseModelNotification configuration.
-
slack_webhook_url:
str|None#
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
slack_webhook_url:
- class srunx.config.SrunxConfig(**data)[source]#
Bases:
BaseModelMain srunx configuration.
-
resources:
ResourceDefaults#
-
environment:
EnvironmentDefaults#
-
notifications:
NotificationConfig#
-
log_dir:
str#
-
work_dir:
str|None#
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
resources:
- srunx.config.get_config_paths()[source]#
Get configuration file paths in order of precedence (lowest to highest).
- Return type:
list[Path]
- srunx.config.load_config_from_file(config_path)[source]#
Load configuration from a JSON file.
- Return type:
dict[str,Any]
- srunx.config.merge_config(base, override)[source]#
Recursively merge configuration dictionaries.
- Return type:
dict[str,Any]
- srunx.config.load_config_from_env()[source]#
Load configuration from environment variables.
- Return type:
dict[str,Any]
- srunx.config.load_config()[source]#
Load configuration from all sources in order of precedence.
- Return type:
- srunx.config.save_user_config(config)[source]#
Save configuration to user config file.
Merges SrunxConfig fields into the existing file so that SSH profile data (managed by ConfigManager) is preserved.
- Return type:
None
- srunx.config.create_example_config()[source]#
Create an example configuration file content.
- Return type:
str
srunx.exceptions module#
- exception srunx.exceptions.WorkflowError[source]#
Bases:
ExceptionBase exception for workflow errors.
- exception srunx.exceptions.WorkflowValidationError[source]#
Bases:
WorkflowErrorException raised when workflow validation fails.
- exception srunx.exceptions.WorkflowExecutionError[source]#
Bases:
WorkflowErrorException raised when workflow execution fails.
srunx.formatters module#
Unified Slack message formatters with table-based layouts.
- class srunx.formatters.SlackTableFormatter[source]#
Bases:
objectFormat data as ASCII tables for Slack code blocks.
- static header(title, timestamp=None)[source]#
Create formatted header.
- Parameters:
title (
str) – Header title with emojitimestamp (
datetime|None) – Optional timestamp to display
- Return type:
str- Returns:
Formatted header string
- static box_title(text, width=40)[source]#
Create box with title.
- Parameters:
text (
str) – Title textwidth (
int) – Box width
- Return type:
str- Returns:
Box string
- static key_value_table(data, width=40)[source]#
Create key-value table.
- Parameters:
data (
dict[str,str]) – Dictionary of key-value pairswidth (
int) – Table width
- Return type:
str- Returns:
Formatted table string
- class srunx.formatters.SlackNotificationFormatter[source]#
Bases:
objectFormat different notification types with unified style.
- job_status_change(job_id, name, old_status, new_status, partition=None, runtime=None, gpus=None, success=True)[source]#
Format job status change notification.
- Parameters:
job_id (
int) – Job IDname (
str) – Job nameold_status (
str) – Previous statusnew_status (
str) – Current statuspartition (
str|None) – SLURM partitionruntime (
str|None) – Runtime stringgpus (
int|None) – Number of GPUssuccess (
bool) – Whether the status change is successful
- Return type:
str- Returns:
Formatted Slack message
- job_status_report(jobs, timestamp=None)[source]#
Format job status report.
- Parameters:
jobs (
list[dict]) – List of job dictionaries with keys: id, name, status, runtime, gpustimestamp (
datetime|None) – Report timestamp
- Return type:
str- Returns:
Formatted Slack message
- resource_available(partition, available_gpus, total_gpus, idle_nodes, total_nodes, utilization)[source]#
Format resource availability notification.
- Parameters:
partition (
str|None) – SLURM partitionavailable_gpus (
int) – Number of available GPUstotal_gpus (
int) – Total GPUsidle_nodes (
int) – Number of idle nodestotal_nodes (
int) – Total nodesutilization (
float) – GPU utilization (0-100)
- Return type:
str- Returns:
Formatted Slack message
- cluster_status(job_stats=None, resource_stats=None, running_jobs=None, timestamp=None)[source]#
Format cluster status report.
- Parameters:
job_stats (
dict|None) – Job statistics dictresource_stats (
dict|None) – Resource statistics dictrunning_jobs (
list[dict] |None) – List of running job dictstimestamp (
datetime|None) – Report timestamp
- Return type:
str- Returns:
Formatted Slack message
srunx.history module#
Job execution history tracking with SQLite.
- class srunx.history.JobHistory(db_path=None)[source]#
Bases:
objectManage job execution history in SQLite database.
- __init__(db_path=None)[source]#
Initialize job history manager.
- Parameters:
db_path (
str|Path|None) – Path to SQLite database file. Defaults to ~/.srunx/history.db
- update_job_completion(job_id, status, completed_at=None)[source]#
Update job completion information.
- Parameters:
job_id (
int) – SLURM job IDstatus (
JobStatus) – Final job statuscompleted_at (
datetime|None) – Completion timestamp (defaults to now)
- Return type:
None
- get_recent_jobs(limit=100)[source]#
Get recent job executions.
- Parameters:
limit (
int) – Maximum number of jobs to return- Return type:
list[dict[str,Any]]- Returns:
List of job records
srunx.logging module#
Centralized logging configuration for srunx.
- srunx.logging.configure_logging(level='INFO', format_string=None, show_time=True, show_level=True, colorize=True)[source]#
Configure loguru logging for srunx.
- Parameters:
level (
str) – Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL).format_string (
str|None) – Custom format string. If None, uses default format.show_time (
bool) – Whether to show timestamp in logs.show_level (
bool) – Whether to show log level in logs.colorize (
bool) – Whether to colorize the output.
- Return type:
None
- srunx.logging.configure_cli_logging(level='INFO', quiet=False)[source]#
Configure logging specifically for CLI usage.
- Parameters:
level (
str) – Logging level.quiet (
bool) – If True, only show WARNING and above.
- Return type:
None
srunx.models module#
Data models for SLURM job management.
- class srunx.models.JobStatus(*values)[source]#
Bases:
EnumJob status enumeration for both SLURM jobs and workflow jobs.
- UNKNOWN = 'UNKNOWN'#
- PENDING = 'PENDING'#
- RUNNING = 'RUNNING'#
- COMPLETED = 'COMPLETED'#
- FAILED = 'FAILED'#
- CANCELLED = 'CANCELLED'#
- TIMEOUT = 'TIMEOUT'#
- class srunx.models.DependencyType(*values)[source]#
Bases:
EnumDependency type enumeration for workflow job dependencies.
- AFTER_OK = 'afterok'#
- AFTER = 'after'#
- AFTER_ANY = 'afterany'#
- AFTER_NOT_OK = 'afternotok'#
- class srunx.models.JobDependency(**data)[source]#
Bases:
BaseModelRepresents a job dependency with type and target job name.
-
job_name:
str#
-
dep_type:
str#
- property dependency_type: DependencyType#
Get the dependency type as a DependencyType enum.
- classmethod parse(dep_str)[source]#
Parse a dependency string into a JobDependency.
Formats supported: - “job_a” -> afterok:job_a (default behavior) - “after:job_a” -> after:job_a - “afterany:job_a” -> afterany:job_a - “afternotok:job_a” -> afternotok:job_a - “afterok:job_a” -> afterok:job_a (explicit)
- Return type:
Self
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
job_name:
- class srunx.models.JobResource(**data)[source]#
Bases:
BaseModelSLURM resource allocation requirements.
-
nodes:
int#
-
gpus_per_node:
int#
-
ntasks_per_node:
int#
-
cpus_per_task:
int#
-
memory_per_node:
str|None#
-
time_limit:
str|None#
-
nodelist:
str|None#
-
partition:
str|None#
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
nodes:
- class srunx.models.ContainerResource(**data)[source]#
Bases:
BaseModelContainer resource allocation requirements.
Supports Pyxis (–container-* srun flags) and Apptainer/Singularity (apptainer exec command wrapping) runtimes.
Ref (Pyxis): NVIDIA/pyxis
-
runtime:
Literal['pyxis','apptainer','singularity']#
-
image:
str|None#
-
mounts:
list[str]#
-
workdir:
str|None#
-
nv:
bool#
-
rocm:
bool#
-
cleanenv:
bool#
-
fakeroot:
bool#
-
writable_tmpfs:
bool#
-
overlay:
str|None#
-
env:
dict[str,str]#
- validate_runtime_fields()[source]#
Ensure Apptainer-only fields are not set for Pyxis runtime.
- Return type:
Self
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
runtime:
- class srunx.models.JobEnvironment(**data)[source]#
Bases:
BaseModelJob environment configuration.
-
conda:
str|None#
-
venv:
str|None#
-
container:
ContainerResource|None#
-
env_vars:
dict[str,str]#
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
conda:
- class srunx.models.BaseJob(**data)[source]#
Bases:
BaseModel-
name:
str#
-
job_id:
int|None#
-
depends_on:
list[str]#
-
retry:
int#
-
retry_delay:
int#
-
partition:
str|None#
-
user:
str|None#
-
elapsed_time:
str|None#
-
nodes:
int|None#
-
nodelist:
str|None#
-
cpus:
int|None#
-
gpus:
int|None#
- model_post_init(_BaseJob__context)[source]#
Parse string dependencies into JobDependency objects after initialization.
- Return type:
None
- property parsed_dependencies: list[JobDependency]#
Get the parsed dependency objects.
- property status: JobStatus#
Accessing
job.statusalways triggers a lightweight refresh (only if we have ajob_idand the status isn’t terminal).
- dependencies_satisfied(completed_job_names_or_statuses, started_job_names=None, completed_job_names=None)[source]#
Check if all dependencies are satisfied based on their types.
- Parameters:
completed_job_names_or_statuses (
list[str] |dict[str,JobStatus]) – Either list of completed job names (old interface) or dict mapping job names to their current status (new interface)started_job_names (
list[str] |None) – List of jobs that have started (for backward compatibility - unused)completed_job_names (
list[str] |None) – List of jobs that have completed successfully (for backward compatibility)
- Return type:
bool
- property retry_count: int#
Get the current retry count.
- should_retry()[source]#
Check if the job should be retried based on status and retry count.
- Return type:
bool
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
name:
- class srunx.models.Job(**data)[source]#
Bases:
BaseJobRepresents a SLURM job with complete configuration.
-
command:
str|list[str]#
-
resources:
JobResource#
-
environment:
JobEnvironment#
-
log_dir:
str#
-
work_dir:
str#
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(_BaseJob__context)#
Parse string dependencies into JobDependency objects after initialization.
- Return type:
None
-
command:
- class srunx.models.ShellJob(**data)[source]#
Bases:
BaseJob-
script_path:
str#
-
script_vars:
dict[str,str|int|float|bool]#
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(_BaseJob__context)#
Parse string dependencies into JobDependency objects after initialization.
- Return type:
None
-
script_path:
- class srunx.models.Workflow(name, jobs=None)[source]#
Bases:
objectRepresents a workflow containing multiple jobs with dependencies.
- srunx.models.render_job_script(template_path, job, output_dir=None, verbose=False)[source]#
Render a SLURM job script from a template.
- Parameters:
template_path (
Path|str) – Path to the Jinja template file.job (
Job) – Job configuration.output_dir (
Path|str|None) – Directory where the generated script will be saved.verbose (
bool) – Whether to print the rendered content.
- Return type:
str- Returns:
Path to the generated SLURM batch script.
- Raises:
FileNotFoundError – If the template file does not exist.
jinja2.TemplateError – If template rendering fails.
- srunx.models.render_shell_job_script(template_path, job, output_dir=None, verbose=False)[source]#
Render a SLURM shell job script from a template.
- Parameters:
template_path (
Path|str) – Path to the Jinja template file.job (
ShellJob) – ShellJob configuration.output_dir (
Path|str|None) – Directory where the generated script will be saved.verbose (
bool) – Whether to print the rendered content.
- Return type:
str- Returns:
Path to the generated SLURM batch script.
- Raises:
FileNotFoundError – If the template file does not exist.
jinja2.TemplateError – If template rendering fails.
srunx.runner module#
Workflow runner for executing YAML-defined workflows with SLURM
- class srunx.runner.WorkflowRunner(workflow, callbacks=None, args=None, default_project=None)[source]#
Bases:
objectRunner for executing workflows defined in YAML with dynamic job scheduling.
Jobs are executed as soon as their dependencies are satisfied, rather than waiting for entire dependency levels to complete.
- __init__(workflow, callbacks=None, args=None, default_project=None)[source]#
Initialize workflow runner.
- classmethod from_yaml(yaml_path, callbacks=None, single_job=None)[source]#
Load and validate a workflow from a YAML file.
- Parameters:
yaml_path (
str|Path) – Path to the YAML workflow definition file.callbacks (
Sequence[Callback] |None) – List of callbacks for job notifications.single_job (
str|None) – If specified, only load and process this job.
- Return type:
Self- Returns:
WorkflowRunner instance with loaded workflow.
- Raises:
FileNotFoundError – If the YAML file doesn’t exist.
yaml.YAMLError – If the YAML is malformed.
WorkflowValidationError – If the workflow structure is invalid.
- run(from_job=None, to_job=None, single_job=None)[source]#
Run a workflow with dynamic job scheduling.
Jobs are executed as soon as their dependencies are satisfied.
- Parameters:
from_job (
str|None) – Start execution from this job (inclusive), ignoring dependenciesto_job (
str|None) – Stop execution at this job (inclusive)single_job (
str|None) – Execute only this specific job, ignoring all dependencies
- Return type:
- Returns:
Dictionary mapping job names to completed Job instances.
srunx.template module#
Job template management for common use cases.
- srunx.template.list_templates()[source]#
List all available templates.
- Return type:
list[dict[str,str]]- Returns:
List of template information dictionaries.
srunx.utils module#
Utility functions for SLURM job management.
Module contents#
srunx - Python library for SLURM job management.
- class srunx.Slurm(default_template=None, callbacks=None)[source]#
Bases:
objectClient for interacting with SLURM workload manager.
- __init__(default_template=None, callbacks=None)[source]#
Initialize SLURM client.
- Parameters:
default_template (
str|None) – Path to default job template.callbacks (
Sequence[Callback] |None) – List of callbacks.
- cancel(job_id)[source]#
Cancel a SLURM job.
- Parameters:
job_id (
int) – SLURM job ID to cancel.- Raises:
subprocess.CalledProcessError – If job cancellation fails.
- Return type:
None
- get_job_output(job_id, job_name=None)[source]#
Get job output from SLURM log files.
- Parameters:
job_id (
int|str) – SLURM job IDjob_name (
str|None) – Job name for better log file detection
- Return type:
tuple[str,str]- Returns:
Tuple of (output_content, error_content)
- get_job_output_detailed(job_id, job_name=None, skip_content=False)[source]#
Get detailed job output information including found log files.
- Parameters:
job_id (
int|str) – SLURM job IDjob_name (
str|None) – Job name for better log file detectionskip_content (
bool) – If True, only find log files without reading content
- Return type:
dict[str,str|list[str] |None]- Returns:
Dictionary with detailed log information
- queue(user=None)[source]#
List jobs for a user.
- Parameters:
user (
str|None) – Username (defaults to current user).- Return type:
list[BaseJob]- Returns:
List of Job objects.
- static retrieve(job_id)[source]#
Retrieve job information from SLURM.
- Parameters:
job_id (
int) – SLURM job ID.- Return type:
- Returns:
Job object with current status.
- run(job, template_path=None, callbacks=None, poll_interval=5, verbose=False, workflow_name=None)[source]#
Submit a job and wait for completion.
- submit(job, template_path=None, callbacks=None, verbose=False, record_history=True, workflow_name=None)[source]#
Submit a job to SLURM.
- Parameters:
template_path (
str|None) – Optional template path (uses default if not provided).callbacks (
Sequence[Callback] |None) – List of callbacks.verbose (
bool) – Whether to print the rendered content.record_history (
bool) – Whether to record job in history database.workflow_name (
str|None) – Name of the workflow if part of a workflow.
- Return type:
- Returns:
Job instance with updated job_id and status.
- Raises:
subprocess.CalledProcessError – If job submission fails.
- tail_log(job_id, job_name=None, follow=False, last_n=None, poll_interval=1.0)[source]#
Display job logs with optional real-time streaming.
- Parameters:
job_id (
int|str) – SLURM job IDjob_name (
str|None) – Job name for better log file detectionfollow (
bool) – If True, continuously stream new log lines (like tail -f)last_n (
int|None) – Show only the last N linespoll_interval (
float) – Polling interval in seconds for follow mode
- Return type:
None
- srunx.submit_job(job, template_path=None, callbacks=None, verbose=False)[source]#
Submit a job to SLURM (convenience function).
- srunx.retrieve_job(job_id)[source]#
Get job status (convenience function).
- Parameters:
job_id (
int) – SLURM job ID.- Return type:
- srunx.cancel_job(job_id)[source]#
Cancel a job (convenience function).
- Parameters:
job_id (
int) – SLURM job ID.- Return type:
None
- class srunx.Callback[source]#
Bases:
objectBase callback class for job state notifications.
- on_resources_available(snapshot)[source]#
Called when resources become available (threshold met).
- Parameters:
snapshot (
ResourceSnapshot) – Resource snapshot at the time resources became available.- Return type:
None
- on_resources_exhausted(snapshot)[source]#
Called when resources are exhausted (below threshold).
- Parameters:
snapshot (
ResourceSnapshot) – Resource snapshot at the time resources were exhausted.- Return type:
None
- on_scheduled_report(report)[source]#
Called when a scheduled report is generated.
- Parameters:
report (
Report) – Generated report containing job and resource statistics.- Return type:
None
- class srunx.SlackCallback(webhook_url)[source]#
Bases:
CallbackCallback that sends notifications to Slack via webhook.
- __init__(webhook_url)[source]#
Initialize Slack callback.
- Parameters:
webhook_url (
str) – Slack webhook URL for sending notifications.- Raises:
ValueError – If webhook_url is not a valid Slack webhook URL.
- on_resources_available(snapshot)[source]#
Send resource availability notification to Slack.
- Parameters:
snapshot (
ResourceSnapshot) – Resource snapshot at the time resources became available.- Return type:
None
- on_resources_exhausted(snapshot)[source]#
Send resource exhaustion notification to Slack.
- Parameters:
snapshot (
ResourceSnapshot) – Resource snapshot at the time resources were exhausted.- Return type:
None
- class srunx.ContainerRuntime(*args, **kwargs)[source]#
Bases:
ProtocolProtocol for container runtime backends.
- __init__(*args, **kwargs)#
- class srunx.LaunchSpec(prelude='', srun_args='', launch_prefix='')[source]#
Bases:
objectRuntime-agnostic container launch specification.
Three distinct outputs model different injection points in generated scripts: - prelude: Shell setup lines executed before the command (e.g., declare arrays) - srun_args: Flags passed to srun itself (Pyxis uses this) - launch_prefix: Command wrapper prepended to the user command (Apptainer uses this)
- __init__(prelude='', srun_args='', launch_prefix='')#
-
launch_prefix:
str= ''#
-
prelude:
str= ''#
-
srun_args:
str= ''#
- class srunx.PyxisRuntime[source]#
Bases:
objectPyxis runtime backend – generates –container-* srun flags.
- class srunx.ApptainerRuntime(binary='apptainer')[source]#
Bases:
objectApptainer/Singularity runtime backend – generates launch_prefix.
- srunx.get_runtime(name)[source]#
Return a container runtime backend by name.
- Parameters:
name (
str) – Runtime identifier – “pyxis”, “apptainer”, or “singularity”.- Return type:
- Returns:
A ContainerRuntime implementation.
- Raises:
ValueError – If the runtime name is not recognized.
- class srunx.BaseJob(**data)[source]#
Bases:
BaseModel- dependencies_satisfied(completed_job_names_or_statuses, started_job_names=None, completed_job_names=None)[source]#
Check if all dependencies are satisfied based on their types.
- Parameters:
completed_job_names_or_statuses (
list[str] |dict[str,JobStatus]) – Either list of completed job names (old interface) or dict mapping job names to their current status (new interface)started_job_names (
list[str] |None) – List of jobs that have started (for backward compatibility - unused)completed_job_names (
list[str] |None) – List of jobs that have completed successfully (for backward compatibility)
- Return type:
bool
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(_BaseJob__context)[source]#
Parse string dependencies into JobDependency objects after initialization.
- Return type:
None
- property parsed_dependencies: list[JobDependency]#
Get the parsed dependency objects.
- property retry_count: int#
Get the current retry count.
- should_retry()[source]#
Check if the job should be retried based on status and retry count.
- Return type:
bool
- property status: JobStatus#
Accessing
job.statusalways triggers a lightweight refresh (only if we have ajob_idand the status isn’t terminal).
-
name:
str#
-
job_id:
int|None#
-
depends_on:
list[str]#
-
retry:
int#
-
retry_delay:
int#
-
partition:
str|None#
-
user:
str|None#
-
elapsed_time:
str|None#
-
nodes:
int|None#
-
nodelist:
str|None#
-
cpus:
int|None#
-
gpus:
int|None#
- class srunx.ContainerResource(**data)[source]#
Bases:
BaseModelContainer resource allocation requirements.
Supports Pyxis (–container-* srun flags) and Apptainer/Singularity (apptainer exec command wrapping) runtimes.
Ref (Pyxis): NVIDIA/pyxis
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- validate_runtime_fields()[source]#
Ensure Apptainer-only fields are not set for Pyxis runtime.
- Return type:
Self
-
runtime:
Literal['pyxis','apptainer','singularity']#
-
image:
str|None#
-
mounts:
list[str]#
-
workdir:
str|None#
-
nv:
bool#
-
rocm:
bool#
-
cleanenv:
bool#
-
fakeroot:
bool#
-
writable_tmpfs:
bool#
-
overlay:
str|None#
-
env:
dict[str,str]#
- class srunx.Job(**data)[source]#
Bases:
BaseJobRepresents a SLURM job with complete configuration.
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(_BaseJob__context)#
Parse string dependencies into JobDependency objects after initialization.
- Return type:
None
-
command:
str|list[str]#
-
resources:
JobResource#
-
environment:
JobEnvironment#
-
log_dir:
str#
-
work_dir:
str#
- name: str#
- job_id: int | None#
- depends_on: list[str]#
- retry: int#
- retry_delay: int#
- partition: str | None#
- user: str | None#
- elapsed_time: str | None#
- nodes: int | None#
- nodelist: str | None#
- cpus: int | None#
- gpus: int | None#
- class srunx.ShellJob(**data)[source]#
Bases:
BaseJob- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(_BaseJob__context)#
Parse string dependencies into JobDependency objects after initialization.
- Return type:
None
-
script_path:
str#
-
script_vars:
dict[str,str|int|float|bool]#
- name: str#
- job_id: int | None#
- depends_on: list[str]#
- retry: int#
- retry_delay: int#
- partition: str | None#
- user: str | None#
- elapsed_time: str | None#
- nodes: int | None#
- nodelist: str | None#
- cpus: int | None#
- gpus: int | None#
- class srunx.JobResource(**data)[source]#
Bases:
BaseModelSLURM resource allocation requirements.
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
nodes:
int#
-
gpus_per_node:
int#
-
ntasks_per_node:
int#
-
cpus_per_task:
int#
-
memory_per_node:
str|None#
-
time_limit:
str|None#
-
nodelist:
str|None#
-
partition:
str|None#
- class srunx.JobEnvironment(**data)[source]#
Bases:
BaseModelJob environment configuration.
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
conda:
str|None#
-
venv:
str|None#
-
container:
ContainerResource|None#
-
env_vars:
dict[str,str]#
- class srunx.JobStatus(*values)[source]#
Bases:
EnumJob status enumeration for both SLURM jobs and workflow jobs.
- UNKNOWN = 'UNKNOWN'#
- PENDING = 'PENDING'#
- RUNNING = 'RUNNING'#
- COMPLETED = 'COMPLETED'#
- FAILED = 'FAILED'#
- CANCELLED = 'CANCELLED'#
- TIMEOUT = 'TIMEOUT'#
- class srunx.Workflow(name, jobs=None)[source]#
Bases:
objectRepresents a workflow containing multiple jobs with dependencies.
- srunx.render_job_script(template_path, job, output_dir=None, verbose=False)[source]#
Render a SLURM job script from a template.
- Parameters:
template_path (
Path|str) – Path to the Jinja template file.job (
Job) – Job configuration.output_dir (
Path|str|None) – Directory where the generated script will be saved.verbose (
bool) – Whether to print the rendered content.
- Return type:
str- Returns:
Path to the generated SLURM batch script.
- Raises:
FileNotFoundError – If the template file does not exist.
jinja2.TemplateError – If template rendering fails.
- class srunx.JobMonitor(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#
Bases:
BaseMonitorMonitor SLURM jobs until they reach terminal states.
Polls jobs at configured intervals and notifies callbacks on state transitions. Supports monitoring single or multiple jobs with target status detection.
- __init__(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#
Initialize job monitor.
- Parameters:
job_ids (
list[int]) – List of SLURM job IDs to monitor.target_statuses (
list[JobStatus] |None) – Terminal statuses to wait for. Defaults to [COMPLETED, FAILED, CANCELLED, TIMEOUT].config (
MonitorConfig|None) – Monitoring configuration. Defaults to MonitorConfig() if None.callbacks (
list[Callback] |None) – List of notification callbacks. Defaults to empty list if None.client (
Slurm|None) – SLURM client instance. Defaults to Slurm() if None.
- Raises:
ValueError – If job_ids is empty.
- class srunx.ResourceMonitor(min_gpus, partition=None, config=None, callbacks=None)[source]#
Bases:
BaseMonitorMonitor SLURM GPU resources until availability threshold is met.
Polls partition resources at configured intervals and notifies callbacks when resources become available or exhausted.
- __init__(min_gpus, partition=None, config=None, callbacks=None)[source]#
Initialize resource monitor.
- Parameters:
min_gpus (
int) – Minimum number of GPUs required for threshold.partition (
str|None) – SLURM partition to monitor. Defaults to all partitions if None.config (
MonitorConfig|None) – Monitoring configuration. Defaults to MonitorConfig() if None.callbacks (
list[Callback] |None) – List of notification callbacks. Defaults to empty list if None.
- Raises:
ValueError – If min_gpus < 0.
- check_condition()[source]#
Check if resource availability threshold is met.
- Return type:
bool- Returns:
True if available GPUs >= min_gpus threshold, False otherwise.
- Raises:
SlurmError – If SLURM command fails.
- get_current_state()[source]#
Get current resource state for comparison and logging.
- Return type:
dict[str,Any]- Returns:
Dictionary with current resource state. Format: {
”partition”: str | None, “gpus_available”: int, “gpus_total”: int, “meets_threshold”: bool
}
- Raises:
SlurmError – If SLURM command fails.
- get_partition_resources()[source]#
Query SLURM for GPU resource availability.
Uses sinfo to get total GPUs per partition and squeue to get GPUs in use. Filters out DOWN/DRAIN/DRAINING nodes from availability calculation.
- Return type:
- Returns:
ResourceSnapshot with current resource state.
- Raises:
SlurmError – If SLURM command fails.
- class srunx.MonitorConfig(**data)[source]#
Bases:
BaseModelConfiguration for monitoring operations.
- class Config[source]#
Bases:
object- json_schema_extra = {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}#
- property is_aggressive: bool#
Check if polling interval is aggressive (<5 seconds).
- model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
poll_interval:
int#
-
timeout:
int|None#
-
notify_on_change:
bool#
- class srunx.ResourceSnapshot(**data)[source]#
Bases:
BaseModelPoint-in-time snapshot of SLURM partition resources.
- class Config[source]#
Bases:
object- json_schema_extra = {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}#
- property gpu_utilization: float#
GPU utilization percentage (0.0 to 1.0).
- property has_available_gpus: bool#
Check if any GPUs are available.
- meets_threshold(min_gpus)[source]#
Check if available GPUs meet minimum threshold.
- Parameters:
min_gpus (
int) – Minimum required GPUs- Return type:
bool- Returns:
True if gpus_available >= min_gpus
- model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
timestamp:
datetime#
-
partition:
str|None#
-
total_gpus:
int#
-
gpus_in_use:
int#
-
gpus_available:
int#
-
jobs_running:
int#
-
nodes_total:
int#
-
nodes_idle:
int#
-
nodes_down:
int#
- class srunx.WatchMode(*values)[source]#
Bases:
StrEnumMonitoring mode enumeration.
- UNTIL_CONDITION = 'until'#
Monitor until condition is met, then exit
- CONTINUOUS = 'continuous'#
Monitor indefinitely, notify on every state change
- class srunx.WorkflowRunner(workflow, callbacks=None, args=None, default_project=None)[source]#
Bases:
objectRunner for executing workflows defined in YAML with dynamic job scheduling.
Jobs are executed as soon as their dependencies are satisfied, rather than waiting for entire dependency levels to complete.
- __init__(workflow, callbacks=None, args=None, default_project=None)[source]#
Initialize workflow runner.
- classmethod from_yaml(yaml_path, callbacks=None, single_job=None)[source]#
Load and validate a workflow from a YAML file.
- Parameters:
yaml_path (
str|Path) – Path to the YAML workflow definition file.callbacks (
Sequence[Callback] |None) – List of callbacks for job notifications.single_job (
str|None) – If specified, only load and process this job.
- Return type:
Self- Returns:
WorkflowRunner instance with loaded workflow.
- Raises:
FileNotFoundError – If the YAML file doesn’t exist.
yaml.YAMLError – If the YAML is malformed.
WorkflowValidationError – If the workflow structure is invalid.
- run(from_job=None, to_job=None, single_job=None)[source]#
Run a workflow with dynamic job scheduling.
Jobs are executed as soon as their dependencies are satisfied.
- Parameters:
from_job (
str|None) – Start execution from this job (inclusive), ignoring dependenciesto_job (
str|None) – Stop execution at this job (inclusive)single_job (
str|None) – Execute only this specific job, ignoring all dependencies
- Return type:
- Returns:
Dictionary mapping job names to completed Job instances.
- srunx.configure_logging(level='INFO', format_string=None, show_time=True, show_level=True, colorize=True)[source]#
Configure loguru logging for srunx.
- Parameters:
level (
str) – Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL).format_string (
str|None) – Custom format string. If None, uses default format.show_time (
bool) – Whether to show timestamp in logs.show_level (
bool) – Whether to show log level in logs.colorize (
bool) – Whether to colorize the output.
- Return type:
None
- srunx.configure_cli_logging(level='INFO', quiet=False)[source]#
Configure logging specifically for CLI usage.
- Parameters:
level (
str) – Logging level.quiet (
bool) – If True, only show WARNING and above.
- Return type:
None