srunx package

Contents

srunx package#

Subpackages#

Submodules#

srunx.callbacks module#

Callback system for job state notifications.

class srunx.callbacks.Callback[source]#

Bases: object

Base callback class for job state notifications.

on_job_submitted(job)[source]#

Called when a job is submitted to SLURM.

Parameters:

job (BaseJob | Job | ShellJob) – Job that was submitted.

Return type:

None

on_job_completed(job)[source]#

Called when a job completes successfully.

Parameters:

job (BaseJob | Job | ShellJob) – Job that completed.

Return type:

None

on_job_failed(job)[source]#

Called when a job fails.

Parameters:

job (BaseJob | Job | ShellJob) – Job that failed.

Return type:

None

on_job_running(job)[source]#

Called when a job starts running.

Parameters:

job (BaseJob | Job | ShellJob) – Job that started running.

Return type:

None

on_job_cancelled(job)[source]#

Called when a job is cancelled.

Parameters:

job (BaseJob | Job | ShellJob) – Job that was cancelled.

Return type:

None

on_workflow_started(workflow)[source]#

Called when a workflow starts.

Parameters:

workflow (Workflow) – Workflow that started.

Return type:

None

on_workflow_completed(workflow)[source]#

Called when a workflow completes.

Parameters:

workflow (Workflow) – Workflow that completed.

Return type:

None

on_resources_available(snapshot)[source]#

Called when resources become available (threshold met).

Parameters:

snapshot (ResourceSnapshot) – Resource snapshot at the time resources became available.

Return type:

None

on_resources_exhausted(snapshot)[source]#

Called when resources are exhausted (below threshold).

Parameters:

snapshot (ResourceSnapshot) – Resource snapshot at the time resources were exhausted.

Return type:

None

on_scheduled_report(report)[source]#

Called when a scheduled report is generated.

Parameters:

report (Report) – Generated report containing job and resource statistics.

Return type:

None

class srunx.callbacks.SlackCallback(webhook_url)[source]#

Bases: Callback

Callback that sends notifications to Slack via webhook.

__init__(webhook_url)[source]#

Initialize Slack callback.

Parameters:

webhook_url (str) – Slack webhook URL for sending notifications.

Raises:

ValueError – If webhook_url is not a valid Slack webhook URL.

on_job_submitted(job)[source]#

Send a message to Slack.

Parameters:
Return type:

None

on_job_completed(job)[source]#

Send completion notification to Slack.

Parameters:

job (BaseJob | Job | ShellJob) – Job that completed.

Return type:

None

on_job_failed(job)[source]#

Send failure notification to Slack.

Parameters:

job (BaseJob | Job | ShellJob) – Job that failed.

Return type:

None

on_job_running(job)[source]#

Send running notification to Slack.

Parameters:

job (BaseJob | Job | ShellJob) – Job that started running.

Return type:

None

on_job_cancelled(job)[source]#

Send cancellation notification to Slack.

Parameters:

job (BaseJob | Job | ShellJob) – Job that was cancelled.

Return type:

None

on_workflow_completed(workflow)[source]#

Send completion notification to Slack.

Parameters:

workflow (Workflow) – Workflow that completed.

Return type:

None

on_resources_available(snapshot)[source]#

Send resource availability notification to Slack.

Parameters:

snapshot (ResourceSnapshot) – Resource snapshot at the time resources became available.

Return type:

None

on_resources_exhausted(snapshot)[source]#

Send resource exhaustion notification to Slack.

Parameters:

snapshot (ResourceSnapshot) – Resource snapshot at the time resources were exhausted.

Return type:

None

on_scheduled_report(report)[source]#

Send scheduled report to Slack.

Parameters:

report (Report) – Generated report containing job and resource statistics.

Return type:

None

srunx.client module#

SLURM client for job submission and management.

class srunx.client.Slurm(default_template=None, callbacks=None)[source]#

Bases: object

Client for interacting with SLURM workload manager.

__init__(default_template=None, callbacks=None)[source]#

Initialize SLURM client.

Parameters:
  • default_template (str | None) – Path to default job template.

  • callbacks (Sequence[Callback] | None) – List of callbacks.

submit(job, template_path=None, callbacks=None, verbose=False, record_history=True, workflow_name=None)[source]#

Submit a job to SLURM.

Parameters:
  • job (Job | ShellJob) – Job configuration.

  • template_path (str | None) – Optional template path (uses default if not provided).

  • callbacks (Sequence[Callback] | None) – List of callbacks.

  • verbose (bool) – Whether to print the rendered content.

  • record_history (bool) – Whether to record job in history database.

  • workflow_name (str | None) – Name of the workflow if part of a workflow.

Return type:

Job | ShellJob

Returns:

Job instance with updated job_id and status.

Raises:

subprocess.CalledProcessError – If job submission fails.

static retrieve(job_id)[source]#

Retrieve job information from SLURM.

Parameters:

job_id (int) – SLURM job ID.

Return type:

BaseJob

Returns:

Job object with current status.

cancel(job_id)[source]#

Cancel a SLURM job.

Parameters:

job_id (int) – SLURM job ID to cancel.

Raises:

subprocess.CalledProcessError – If job cancellation fails.

Return type:

None

queue(user=None)[source]#

List jobs for a user.

Parameters:

user (str | None) – Username (defaults to current user).

Return type:

list[BaseJob]

Returns:

List of Job objects.

monitor(job_obj_or_id, poll_interval=5, callbacks=None)[source]#

Wait for a job to complete.

Parameters:
  • job_obj_or_id (BaseJob | Job | ShellJob | int) – Job object or job ID.

  • poll_interval (int) – Polling interval in seconds.

  • callbacks (Sequence[Callback] | None) – List of callbacks.

Return type:

BaseJob | Job | ShellJob

Returns:

Completed job object.

Raises:

RuntimeError – If job fails.

run(job, template_path=None, callbacks=None, poll_interval=5, verbose=False, workflow_name=None)[source]#

Submit a job and wait for completion.

Return type:

Job | ShellJob

get_job_output(job_id, job_name=None)[source]#

Get job output from SLURM log files.

Parameters:
  • job_id (int | str) – SLURM job ID

  • job_name (str | None) – Job name for better log file detection

Return type:

tuple[str, str]

Returns:

Tuple of (output_content, error_content)

get_job_output_detailed(job_id, job_name=None, skip_content=False)[source]#

Get detailed job output information including found log files.

Parameters:
  • job_id (int | str) – SLURM job ID

  • job_name (str | None) – Job name for better log file detection

  • skip_content (bool) – If True, only find log files without reading content

Return type:

dict[str, str | list[str] | None]

Returns:

Dictionary with detailed log information

tail_log(job_id, job_name=None, follow=False, last_n=None, poll_interval=1.0)[source]#

Display job logs with optional real-time streaming.

Parameters:
  • job_id (int | str) – SLURM job ID

  • job_name (str | None) – Job name for better log file detection

  • follow (bool) – If True, continuously stream new log lines (like tail -f)

  • last_n (int | None) – Show only the last N lines

  • poll_interval (float) – Polling interval in seconds for follow mode

Return type:

None

srunx.client.submit_job(job, template_path=None, callbacks=None, verbose=False)[source]#

Submit a job to SLURM (convenience function).

Parameters:
  • job (Job | ShellJob) – Job configuration.

  • template_path (str | None) – Optional template path (uses default if not provided).

  • callbacks (Sequence[Callback] | None) – List of callbacks.

  • verbose (bool) – Whether to print the rendered content.

Return type:

Job | ShellJob

srunx.client.retrieve_job(job_id)[source]#

Get job status (convenience function).

Parameters:

job_id (int) – SLURM job ID.

Return type:

BaseJob

srunx.client.cancel_job(job_id)[source]#

Cancel a job (convenience function).

Parameters:

job_id (int) – SLURM job ID.

Return type:

None

srunx.config module#

Configuration management for srunx.

class srunx.config.ResourceDefaults(**data)[source]#

Bases: BaseModel

Default resource configuration.

nodes: int#
gpus_per_node: int#
ntasks_per_node: int#
cpus_per_task: int#
memory_per_node: str | None#
time_limit: str | None#
nodelist: str | None#
partition: str | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class srunx.config.EnvironmentDefaults(**data)[source]#

Bases: BaseModel

Default environment configuration.

conda: str | None#
venv: str | None#
container: ContainerResource | None#
env_vars: dict[str, str]#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class srunx.config.NotificationConfig(**data)[source]#

Bases: BaseModel

Notification configuration.

slack_webhook_url: str | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class srunx.config.SrunxConfig(**data)[source]#

Bases: BaseModel

Main srunx configuration.

resources: ResourceDefaults#
environment: EnvironmentDefaults#
notifications: NotificationConfig#
log_dir: str#
work_dir: str | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

srunx.config.get_config_paths()[source]#

Get configuration file paths in order of precedence (lowest to highest).

Return type:

list[Path]

srunx.config.load_config_from_file(config_path)[source]#

Load configuration from a JSON file.

Return type:

dict[str, Any]

srunx.config.merge_config(base, override)[source]#

Recursively merge configuration dictionaries.

Return type:

dict[str, Any]

srunx.config.load_config_from_env()[source]#

Load configuration from environment variables.

Return type:

dict[str, Any]

srunx.config.load_config()[source]#

Load configuration from all sources in order of precedence.

Return type:

SrunxConfig

srunx.config.save_user_config(config)[source]#

Save configuration to user config file.

Merges SrunxConfig fields into the existing file so that SSH profile data (managed by ConfigManager) is preserved.

Return type:

None

srunx.config.create_example_config()[source]#

Create an example configuration file content.

Return type:

str

srunx.config.get_config(reload=False)[source]#

Get the global configuration instance.

Return type:

SrunxConfig

srunx.exceptions module#

exception srunx.exceptions.WorkflowError[source]#

Bases: Exception

Base exception for workflow errors.

exception srunx.exceptions.WorkflowValidationError[source]#

Bases: WorkflowError

Exception raised when workflow validation fails.

exception srunx.exceptions.WorkflowExecutionError[source]#

Bases: WorkflowError

Exception raised when workflow execution fails.

srunx.formatters module#

Unified Slack message formatters with table-based layouts.

class srunx.formatters.SlackTableFormatter[source]#

Bases: object

Format data as ASCII tables for Slack code blocks.

static header(title, timestamp=None)[source]#

Create formatted header.

Parameters:
  • title (str) – Header title with emoji

  • timestamp (datetime | None) – Optional timestamp to display

Return type:

str

Returns:

Formatted header string

static box_title(text, width=40)[source]#

Create box with title.

Parameters:
  • text (str) – Title text

  • width (int) – Box width

Return type:

str

Returns:

Box string

static key_value_table(data, width=40)[source]#

Create key-value table.

Parameters:
  • data (dict[str, str]) – Dictionary of key-value pairs

  • width (int) – Table width

Return type:

str

Returns:

Formatted table string

static data_table(headers, rows, title=None, width=60)[source]#

Create multi-column data table.

Parameters:
  • headers (list[str]) – Column headers

  • rows (list[list[str]]) – Data rows

  • title (str | None) – Optional table title

  • width (int) – Table width

Return type:

str

Returns:

Formatted table string

static progress_bar(value, total, width=10)[source]#

Create progress bar.

Parameters:
  • value (float) – Current value

  • total (float) – Total value

  • width (int) – Bar width in characters

Return type:

str

Returns:

Progress bar string (e.g., “██████░░░░”)

class srunx.formatters.SlackNotificationFormatter[source]#

Bases: object

Format different notification types with unified style.

__init__()[source]#
job_status_change(job_id, name, old_status, new_status, partition=None, runtime=None, gpus=None, success=True)[source]#

Format job status change notification.

Parameters:
  • job_id (int) – Job ID

  • name (str) – Job name

  • old_status (str) – Previous status

  • new_status (str) – Current status

  • partition (str | None) – SLURM partition

  • runtime (str | None) – Runtime string

  • gpus (int | None) – Number of GPUs

  • success (bool) – Whether the status change is successful

Return type:

str

Returns:

Formatted Slack message

job_status_report(jobs, timestamp=None)[source]#

Format job status report.

Parameters:
  • jobs (list[dict]) – List of job dictionaries with keys: id, name, status, runtime, gpus

  • timestamp (datetime | None) – Report timestamp

Return type:

str

Returns:

Formatted Slack message

resource_available(partition, available_gpus, total_gpus, idle_nodes, total_nodes, utilization)[source]#

Format resource availability notification.

Parameters:
  • partition (str | None) – SLURM partition

  • available_gpus (int) – Number of available GPUs

  • total_gpus (int) – Total GPUs

  • idle_nodes (int) – Number of idle nodes

  • total_nodes (int) – Total nodes

  • utilization (float) – GPU utilization (0-100)

Return type:

str

Returns:

Formatted Slack message

cluster_status(job_stats=None, resource_stats=None, running_jobs=None, timestamp=None)[source]#

Format cluster status report.

Parameters:
  • job_stats (dict | None) – Job statistics dict

  • resource_stats (dict | None) – Resource statistics dict

  • running_jobs (list[dict] | None) – List of running job dicts

  • timestamp (datetime | None) – Report timestamp

Return type:

str

Returns:

Formatted Slack message

srunx.history module#

Job execution history tracking with SQLite.

class srunx.history.JobHistory(db_path=None)[source]#

Bases: object

Manage job execution history in SQLite database.

__init__(db_path=None)[source]#

Initialize job history manager.

Parameters:

db_path (str | Path | None) – Path to SQLite database file. Defaults to ~/.srunx/history.db

record_job(job, workflow_name=None, metadata=None)[source]#

Record a job execution.

Parameters:
  • job (BaseJob | Job | ShellJob) – Job object to record

  • workflow_name (str | None) – Name of the workflow if part of a workflow

  • metadata (dict[str, Any] | None) – Additional metadata to store

Return type:

None

update_job_completion(job_id, status, completed_at=None)[source]#

Update job completion information.

Parameters:
  • job_id (int) – SLURM job ID

  • status (JobStatus) – Final job status

  • completed_at (datetime | None) – Completion timestamp (defaults to now)

Return type:

None

get_recent_jobs(limit=100)[source]#

Get recent job executions.

Parameters:

limit (int) – Maximum number of jobs to return

Return type:

list[dict[str, Any]]

Returns:

List of job records

get_job_stats(from_date=None, to_date=None)[source]#

Get job statistics for a date range.

Parameters:
  • from_date (str | None) – Start date (ISO format)

  • to_date (str | None) – End date (ISO format)

Return type:

dict[str, Any]

Returns:

Dictionary with job statistics

get_workflow_stats(workflow_name)[source]#

Get statistics for a specific workflow.

Parameters:

workflow_name (str) – Name of the workflow

Return type:

dict[str, Any]

Returns:

Dictionary with workflow statistics

srunx.history.get_history(db_path=None)[source]#

Get or create global job history instance.

Parameters:

db_path (str | Path | None) – Path to SQLite database file

Return type:

JobHistory

Returns:

JobHistory instance

srunx.logging module#

Centralized logging configuration for srunx.

srunx.logging.configure_logging(level='INFO', format_string=None, show_time=True, show_level=True, colorize=True)[source]#

Configure loguru logging for srunx.

Parameters:
  • level (str) – Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL).

  • format_string (str | None) – Custom format string. If None, uses default format.

  • show_time (bool) – Whether to show timestamp in logs.

  • show_level (bool) – Whether to show log level in logs.

  • colorize (bool) – Whether to colorize the output.

Return type:

None

srunx.logging.configure_cli_logging(level='INFO', quiet=False)[source]#

Configure logging specifically for CLI usage.

Parameters:
  • level (str) – Logging level.

  • quiet (bool) – If True, only show WARNING and above.

Return type:

None

srunx.logging.configure_workflow_logging(level='INFO')[source]#

Configure logging for workflow execution.

Parameters:

level (str) – Logging level.

Return type:

None

srunx.logging.get_logger(name)[source]#

Get a logger instance for a module.

Parameters:

name (str) – Module name (usually __name__).

Return type:

Logger

Returns:

Logger instance.

srunx.models module#

Data models for SLURM job management.

class srunx.models.JobStatus(*values)[source]#

Bases: Enum

Job status enumeration for both SLURM jobs and workflow jobs.

UNKNOWN = 'UNKNOWN'#
PENDING = 'PENDING'#
RUNNING = 'RUNNING'#
COMPLETED = 'COMPLETED'#
FAILED = 'FAILED'#
CANCELLED = 'CANCELLED'#
TIMEOUT = 'TIMEOUT'#
class srunx.models.DependencyType(*values)[source]#

Bases: Enum

Dependency type enumeration for workflow job dependencies.

AFTER_OK = 'afterok'#
AFTER = 'after'#
AFTER_ANY = 'afterany'#
AFTER_NOT_OK = 'afternotok'#
class srunx.models.JobDependency(**data)[source]#

Bases: BaseModel

Represents a job dependency with type and target job name.

job_name: str#
dep_type: str#
classmethod validate_dep_type(v)[source]#

Validate dependency type, converting to string value.

property dependency_type: DependencyType#

Get the dependency type as a DependencyType enum.

classmethod parse(dep_str)[source]#

Parse a dependency string into a JobDependency.

Formats supported: - “job_a” -> afterok:job_a (default behavior) - “after:job_a” -> after:job_a - “afterany:job_a” -> afterany:job_a - “afternotok:job_a” -> afternotok:job_a - “afterok:job_a” -> afterok:job_a (explicit)

Return type:

Self

__str__()[source]#

String representation of the dependency.

Return type:

str

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class srunx.models.JobResource(**data)[source]#

Bases: BaseModel

SLURM resource allocation requirements.

nodes: int#
gpus_per_node: int#
ntasks_per_node: int#
cpus_per_task: int#
memory_per_node: str | None#
time_limit: str | None#
nodelist: str | None#
partition: str | None#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class srunx.models.ContainerResource(**data)[source]#

Bases: BaseModel

Container resource allocation requirements.

Supports Pyxis (–container-* srun flags) and Apptainer/Singularity (apptainer exec command wrapping) runtimes.

Ref (Pyxis): NVIDIA/pyxis

runtime: Literal['pyxis', 'apptainer', 'singularity']#
image: str | None#
mounts: list[str]#
workdir: str | None#
nv: bool#
rocm: bool#
cleanenv: bool#
fakeroot: bool#
writable_tmpfs: bool#
overlay: str | None#
env: dict[str, str]#
validate_runtime_fields()[source]#

Ensure Apptainer-only fields are not set for Pyxis runtime.

Return type:

Self

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class srunx.models.JobEnvironment(**data)[source]#

Bases: BaseModel

Job environment configuration.

conda: str | None#
venv: str | None#
container: ContainerResource | None#
env_vars: dict[str, str]#
validate_environment()[source]#
Return type:

Self

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class srunx.models.BaseJob(**data)[source]#

Bases: BaseModel

name: str#
job_id: int | None#
depends_on: list[str]#
retry: int#
retry_delay: int#
partition: str | None#
user: str | None#
elapsed_time: str | None#
nodes: int | None#
nodelist: str | None#
cpus: int | None#
gpus: int | None#
model_post_init(_BaseJob__context)[source]#

Parse string dependencies into JobDependency objects after initialization.

Return type:

None

property parsed_dependencies: list[JobDependency]#

Get the parsed dependency objects.

property status: JobStatus#

Accessing job.status always triggers a lightweight refresh (only if we have a job_id and the status isn’t terminal).

refresh(retries=3)[source]#

Query sacct and update _status in-place.

Return type:

Self

dependencies_satisfied(completed_job_names_or_statuses, started_job_names=None, completed_job_names=None)[source]#

Check if all dependencies are satisfied based on their types.

Parameters:
  • completed_job_names_or_statuses (list[str] | dict[str, JobStatus]) – Either list of completed job names (old interface) or dict mapping job names to their current status (new interface)

  • started_job_names (list[str] | None) – List of jobs that have started (for backward compatibility - unused)

  • completed_job_names (list[str] | None) – List of jobs that have completed successfully (for backward compatibility)

Return type:

bool

property retry_count: int#

Get the current retry count.

can_retry()[source]#

Check if the job can be retried.

Return type:

bool

increment_retry()[source]#

Increment the retry count.

Return type:

None

reset_retry()[source]#

Reset the retry count.

Return type:

None

should_retry()[source]#

Check if the job should be retried based on status and retry count.

Return type:

bool

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class srunx.models.Job(**data)[source]#

Bases: BaseJob

Represents a SLURM job with complete configuration.

command: str | list[str]#
resources: JobResource#
environment: JobEnvironment#
log_dir: str#
work_dir: str#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(_BaseJob__context)#

Parse string dependencies into JobDependency objects after initialization.

Return type:

None

class srunx.models.ShellJob(**data)[source]#

Bases: BaseJob

script_path: str#
script_vars: dict[str, str | int | float | bool]#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(_BaseJob__context)#

Parse string dependencies into JobDependency objects after initialization.

Return type:

None

class srunx.models.Workflow(name, jobs=None)[source]#

Bases: object

Represents a workflow containing multiple jobs with dependencies.

__init__(name, jobs=None)[source]#
add(job)[source]#
Return type:

None

remove(job)[source]#
Return type:

None

get(name)[source]#

Get a job by name.

Return type:

Job | ShellJob | None

get_dependencies(job_name)[source]#

Get dependencies for a specific job.

Return type:

list[str]

show()[source]#
validate()[source]#

Validate workflow job dependencies.

srunx.models.render_job_script(template_path, job, output_dir=None, verbose=False)[source]#

Render a SLURM job script from a template.

Parameters:
  • template_path (Path | str) – Path to the Jinja template file.

  • job (Job) – Job configuration.

  • output_dir (Path | str | None) – Directory where the generated script will be saved.

  • verbose (bool) – Whether to print the rendered content.

Return type:

str

Returns:

Path to the generated SLURM batch script.

Raises:
  • FileNotFoundError – If the template file does not exist.

  • jinja2.TemplateError – If template rendering fails.

srunx.models.render_shell_job_script(template_path, job, output_dir=None, verbose=False)[source]#

Render a SLURM shell job script from a template.

Parameters:
  • template_path (Path | str) – Path to the Jinja template file.

  • job (ShellJob) – ShellJob configuration.

  • output_dir (Path | str | None) – Directory where the generated script will be saved.

  • verbose (bool) – Whether to print the rendered content.

Return type:

str

Returns:

Path to the generated SLURM batch script.

Raises:
  • FileNotFoundError – If the template file does not exist.

  • jinja2.TemplateError – If template rendering fails.

srunx.runner module#

Workflow runner for executing YAML-defined workflows with SLURM

class srunx.runner.WorkflowRunner(workflow, callbacks=None, args=None, default_project=None)[source]#

Bases: object

Runner for executing workflows defined in YAML with dynamic job scheduling.

Jobs are executed as soon as their dependencies are satisfied, rather than waiting for entire dependency levels to complete.

__init__(workflow, callbacks=None, args=None, default_project=None)[source]#

Initialize workflow runner.

Parameters:
  • workflow (Workflow) – Workflow to execute.

  • callbacks (Sequence[Callback] | None) – List of callbacks for job notifications.

  • args (dict[str, Any] | None) – Template variables from the YAML args section.

  • default_project (str | None) – Default project (mount name) for file syncing.

classmethod from_yaml(yaml_path, callbacks=None, single_job=None)[source]#

Load and validate a workflow from a YAML file.

Parameters:
  • yaml_path (str | Path) – Path to the YAML workflow definition file.

  • callbacks (Sequence[Callback] | None) – List of callbacks for job notifications.

  • single_job (str | None) – If specified, only load and process this job.

Return type:

Self

Returns:

WorkflowRunner instance with loaded workflow.

Raises:
  • FileNotFoundError – If the YAML file doesn’t exist.

  • yaml.YAMLError – If the YAML is malformed.

  • WorkflowValidationError – If the workflow structure is invalid.

get_independent_jobs()[source]#

Get all jobs that are independent of any other job.

Return type:

list[Job | ShellJob]

run(from_job=None, to_job=None, single_job=None)[source]#

Run a workflow with dynamic job scheduling.

Jobs are executed as soon as their dependencies are satisfied.

Parameters:
  • from_job (str | None) – Start execution from this job (inclusive), ignoring dependencies

  • to_job (str | None) – Stop execution at this job (inclusive)

  • single_job (str | None) – Execute only this specific job, ignoring all dependencies

Return type:

dict[str, Job | ShellJob]

Returns:

Dictionary mapping job names to completed Job instances.

execute_from_yaml(yaml_path)[source]#

Load and execute a workflow from YAML file.

Parameters:

yaml_path (str | Path) – Path to YAML workflow file.

Return type:

dict[str, Job | ShellJob]

Returns:

Dictionary mapping job names to completed Job instances.

static parse_job(data)[source]#
Return type:

Job | ShellJob

srunx.runner.run_workflow_from_file(yaml_path, single_job=None)[source]#

Convenience function to run workflow from YAML file.

Parameters:
  • yaml_path (str | Path) – Path to YAML workflow file.

  • single_job (str | None) – If specified, only run this job.

Return type:

dict[str, Job | ShellJob]

Returns:

Dictionary mapping job names to completed Job instances.

srunx.template module#

Job template management for common use cases.

srunx.template.list_templates()[source]#

List all available templates.

Return type:

list[dict[str, str]]

Returns:

List of template information dictionaries.

srunx.template.get_template_path(template_name)[source]#

Get the path to a template file.

Parameters:

template_name (str) – Name of the template (e.g., ‘pytorch-ddp’)

Return type:

str

Returns:

Path to the template file.

Raises:

ValueError – If template name is not found.

srunx.template.get_template_info(template_name)[source]#

Get information about a specific template.

Parameters:

template_name (str) – Name of the template

Return type:

dict[str, str]

Returns:

Template information dictionary.

Raises:

ValueError – If template name is not found.

srunx.utils module#

Utility functions for SLURM job management.

srunx.utils.get_job_status(job_id)[source]#

Get job status and information.

Parameters:

job_id (int) – SLURM job ID.

Return type:

BaseJob

Returns:

Job object with current status.

Raises:
  • subprocess.CalledProcessError – If status query fails.

  • ValueError – If job information cannot be parsed.

srunx.utils.job_status_msg(job)[source]#

Generate a formatted status message for a job.

Parameters:

job (BaseJob) – Job object to generate message for.

Return type:

str

Returns:

Formatted status message with icons and job information.

Module contents#

srunx - Python library for SLURM job management.

class srunx.Slurm(default_template=None, callbacks=None)[source]#

Bases: object

Client for interacting with SLURM workload manager.

__init__(default_template=None, callbacks=None)[source]#

Initialize SLURM client.

Parameters:
  • default_template (str | None) – Path to default job template.

  • callbacks (Sequence[Callback] | None) – List of callbacks.

cancel(job_id)[source]#

Cancel a SLURM job.

Parameters:

job_id (int) – SLURM job ID to cancel.

Raises:

subprocess.CalledProcessError – If job cancellation fails.

Return type:

None

get_job_output(job_id, job_name=None)[source]#

Get job output from SLURM log files.

Parameters:
  • job_id (int | str) – SLURM job ID

  • job_name (str | None) – Job name for better log file detection

Return type:

tuple[str, str]

Returns:

Tuple of (output_content, error_content)

get_job_output_detailed(job_id, job_name=None, skip_content=False)[source]#

Get detailed job output information including found log files.

Parameters:
  • job_id (int | str) – SLURM job ID

  • job_name (str | None) – Job name for better log file detection

  • skip_content (bool) – If True, only find log files without reading content

Return type:

dict[str, str | list[str] | None]

Returns:

Dictionary with detailed log information

monitor(job_obj_or_id, poll_interval=5, callbacks=None)[source]#

Wait for a job to complete.

Parameters:
  • job_obj_or_id (BaseJob | Job | ShellJob | int) – Job object or job ID.

  • poll_interval (int) – Polling interval in seconds.

  • callbacks (Sequence[Callback] | None) – List of callbacks.

Return type:

BaseJob | Job | ShellJob

Returns:

Completed job object.

Raises:

RuntimeError – If job fails.

queue(user=None)[source]#

List jobs for a user.

Parameters:

user (str | None) – Username (defaults to current user).

Return type:

list[BaseJob]

Returns:

List of Job objects.

static retrieve(job_id)[source]#

Retrieve job information from SLURM.

Parameters:

job_id (int) – SLURM job ID.

Return type:

BaseJob

Returns:

Job object with current status.

run(job, template_path=None, callbacks=None, poll_interval=5, verbose=False, workflow_name=None)[source]#

Submit a job and wait for completion.

Return type:

Job | ShellJob

submit(job, template_path=None, callbacks=None, verbose=False, record_history=True, workflow_name=None)[source]#

Submit a job to SLURM.

Parameters:
  • job (Job | ShellJob) – Job configuration.

  • template_path (str | None) – Optional template path (uses default if not provided).

  • callbacks (Sequence[Callback] | None) – List of callbacks.

  • verbose (bool) – Whether to print the rendered content.

  • record_history (bool) – Whether to record job in history database.

  • workflow_name (str | None) – Name of the workflow if part of a workflow.

Return type:

Job | ShellJob

Returns:

Job instance with updated job_id and status.

Raises:

subprocess.CalledProcessError – If job submission fails.

tail_log(job_id, job_name=None, follow=False, last_n=None, poll_interval=1.0)[source]#

Display job logs with optional real-time streaming.

Parameters:
  • job_id (int | str) – SLURM job ID

  • job_name (str | None) – Job name for better log file detection

  • follow (bool) – If True, continuously stream new log lines (like tail -f)

  • last_n (int | None) – Show only the last N lines

  • poll_interval (float) – Polling interval in seconds for follow mode

Return type:

None

srunx.submit_job(job, template_path=None, callbacks=None, verbose=False)[source]#

Submit a job to SLURM (convenience function).

Parameters:
  • job (Job | ShellJob) – Job configuration.

  • template_path (str | None) – Optional template path (uses default if not provided).

  • callbacks (Sequence[Callback] | None) – List of callbacks.

  • verbose (bool) – Whether to print the rendered content.

Return type:

Job | ShellJob

srunx.retrieve_job(job_id)[source]#

Get job status (convenience function).

Parameters:

job_id (int) – SLURM job ID.

Return type:

BaseJob

srunx.cancel_job(job_id)[source]#

Cancel a job (convenience function).

Parameters:

job_id (int) – SLURM job ID.

Return type:

None

class srunx.Callback[source]#

Bases: object

Base callback class for job state notifications.

on_job_cancelled(job)[source]#

Called when a job is cancelled.

Parameters:

job (BaseJob | Job | ShellJob) – Job that was cancelled.

Return type:

None

on_job_completed(job)[source]#

Called when a job completes successfully.

Parameters:

job (BaseJob | Job | ShellJob) – Job that completed.

Return type:

None

on_job_failed(job)[source]#

Called when a job fails.

Parameters:

job (BaseJob | Job | ShellJob) – Job that failed.

Return type:

None

on_job_running(job)[source]#

Called when a job starts running.

Parameters:

job (BaseJob | Job | ShellJob) – Job that started running.

Return type:

None

on_job_submitted(job)[source]#

Called when a job is submitted to SLURM.

Parameters:

job (BaseJob | Job | ShellJob) – Job that was submitted.

Return type:

None

on_resources_available(snapshot)[source]#

Called when resources become available (threshold met).

Parameters:

snapshot (ResourceSnapshot) – Resource snapshot at the time resources became available.

Return type:

None

on_resources_exhausted(snapshot)[source]#

Called when resources are exhausted (below threshold).

Parameters:

snapshot (ResourceSnapshot) – Resource snapshot at the time resources were exhausted.

Return type:

None

on_scheduled_report(report)[source]#

Called when a scheduled report is generated.

Parameters:

report (Report) – Generated report containing job and resource statistics.

Return type:

None

on_workflow_completed(workflow)[source]#

Called when a workflow completes.

Parameters:

workflow (Workflow) – Workflow that completed.

Return type:

None

on_workflow_started(workflow)[source]#

Called when a workflow starts.

Parameters:

workflow (Workflow) – Workflow that started.

Return type:

None

class srunx.SlackCallback(webhook_url)[source]#

Bases: Callback

Callback that sends notifications to Slack via webhook.

__init__(webhook_url)[source]#

Initialize Slack callback.

Parameters:

webhook_url (str) – Slack webhook URL for sending notifications.

Raises:

ValueError – If webhook_url is not a valid Slack webhook URL.

on_job_cancelled(job)[source]#

Send cancellation notification to Slack.

Parameters:

job (BaseJob | Job | ShellJob) – Job that was cancelled.

Return type:

None

on_job_completed(job)[source]#

Send completion notification to Slack.

Parameters:

job (BaseJob | Job | ShellJob) – Job that completed.

Return type:

None

on_job_failed(job)[source]#

Send failure notification to Slack.

Parameters:

job (BaseJob | Job | ShellJob) – Job that failed.

Return type:

None

on_job_running(job)[source]#

Send running notification to Slack.

Parameters:

job (BaseJob | Job | ShellJob) – Job that started running.

Return type:

None

on_job_submitted(job)[source]#

Send a message to Slack.

Parameters:
Return type:

None

on_resources_available(snapshot)[source]#

Send resource availability notification to Slack.

Parameters:

snapshot (ResourceSnapshot) – Resource snapshot at the time resources became available.

Return type:

None

on_resources_exhausted(snapshot)[source]#

Send resource exhaustion notification to Slack.

Parameters:

snapshot (ResourceSnapshot) – Resource snapshot at the time resources were exhausted.

Return type:

None

on_scheduled_report(report)[source]#

Send scheduled report to Slack.

Parameters:

report (Report) – Generated report containing job and resource statistics.

Return type:

None

on_workflow_completed(workflow)[source]#

Send completion notification to Slack.

Parameters:

workflow (Workflow) – Workflow that completed.

Return type:

None

class srunx.ContainerRuntime(*args, **kwargs)[source]#

Bases: Protocol

Protocol for container runtime backends.

__init__(*args, **kwargs)#
build_launch_spec(config)[source]#
Return type:

LaunchSpec

class srunx.LaunchSpec(prelude='', srun_args='', launch_prefix='')[source]#

Bases: object

Runtime-agnostic container launch specification.

Three distinct outputs model different injection points in generated scripts: - prelude: Shell setup lines executed before the command (e.g., declare arrays) - srun_args: Flags passed to srun itself (Pyxis uses this) - launch_prefix: Command wrapper prepended to the user command (Apptainer uses this)

__init__(prelude='', srun_args='', launch_prefix='')#
launch_prefix: str = ''#
prelude: str = ''#
srun_args: str = ''#
class srunx.PyxisRuntime[source]#

Bases: object

Pyxis runtime backend – generates –container-* srun flags.

build_launch_spec(config)[source]#
Return type:

LaunchSpec

class srunx.ApptainerRuntime(binary='apptainer')[source]#

Bases: object

Apptainer/Singularity runtime backend – generates launch_prefix.

__init__(binary='apptainer')[source]#
build_launch_spec(config)[source]#
Return type:

LaunchSpec

srunx.get_runtime(name)[source]#

Return a container runtime backend by name.

Parameters:

name (str) – Runtime identifier – “pyxis”, “apptainer”, or “singularity”.

Return type:

ContainerRuntime

Returns:

A ContainerRuntime implementation.

Raises:

ValueError – If the runtime name is not recognized.

class srunx.BaseJob(**data)[source]#

Bases: BaseModel

can_retry()[source]#

Check if the job can be retried.

Return type:

bool

dependencies_satisfied(completed_job_names_or_statuses, started_job_names=None, completed_job_names=None)[source]#

Check if all dependencies are satisfied based on their types.

Parameters:
  • completed_job_names_or_statuses (list[str] | dict[str, JobStatus]) – Either list of completed job names (old interface) or dict mapping job names to their current status (new interface)

  • started_job_names (list[str] | None) – List of jobs that have started (for backward compatibility - unused)

  • completed_job_names (list[str] | None) – List of jobs that have completed successfully (for backward compatibility)

Return type:

bool

increment_retry()[source]#

Increment the retry count.

Return type:

None

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(_BaseJob__context)[source]#

Parse string dependencies into JobDependency objects after initialization.

Return type:

None

property parsed_dependencies: list[JobDependency]#

Get the parsed dependency objects.

refresh(retries=3)[source]#

Query sacct and update _status in-place.

Return type:

Self

reset_retry()[source]#

Reset the retry count.

Return type:

None

property retry_count: int#

Get the current retry count.

should_retry()[source]#

Check if the job should be retried based on status and retry count.

Return type:

bool

property status: JobStatus#

Accessing job.status always triggers a lightweight refresh (only if we have a job_id and the status isn’t terminal).

name: str#
job_id: int | None#
depends_on: list[str]#
retry: int#
retry_delay: int#
partition: str | None#
user: str | None#
elapsed_time: str | None#
nodes: int | None#
nodelist: str | None#
cpus: int | None#
gpus: int | None#
class srunx.ContainerResource(**data)[source]#

Bases: BaseModel

Container resource allocation requirements.

Supports Pyxis (–container-* srun flags) and Apptainer/Singularity (apptainer exec command wrapping) runtimes.

Ref (Pyxis): NVIDIA/pyxis

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

validate_runtime_fields()[source]#

Ensure Apptainer-only fields are not set for Pyxis runtime.

Return type:

Self

runtime: Literal['pyxis', 'apptainer', 'singularity']#
image: str | None#
mounts: list[str]#
workdir: str | None#
nv: bool#
rocm: bool#
cleanenv: bool#
fakeroot: bool#
writable_tmpfs: bool#
overlay: str | None#
env: dict[str, str]#
class srunx.Job(**data)[source]#

Bases: BaseJob

Represents a SLURM job with complete configuration.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(_BaseJob__context)#

Parse string dependencies into JobDependency objects after initialization.

Return type:

None

command: str | list[str]#
resources: JobResource#
environment: JobEnvironment#
log_dir: str#
work_dir: str#
name: str#
job_id: int | None#
depends_on: list[str]#
retry: int#
retry_delay: int#
partition: str | None#
user: str | None#
elapsed_time: str | None#
nodes: int | None#
nodelist: str | None#
cpus: int | None#
gpus: int | None#
class srunx.ShellJob(**data)[source]#

Bases: BaseJob

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(_BaseJob__context)#

Parse string dependencies into JobDependency objects after initialization.

Return type:

None

script_path: str#
script_vars: dict[str, str | int | float | bool]#
name: str#
job_id: int | None#
depends_on: list[str]#
retry: int#
retry_delay: int#
partition: str | None#
user: str | None#
elapsed_time: str | None#
nodes: int | None#
nodelist: str | None#
cpus: int | None#
gpus: int | None#
class srunx.JobResource(**data)[source]#

Bases: BaseModel

SLURM resource allocation requirements.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

nodes: int#
gpus_per_node: int#
ntasks_per_node: int#
cpus_per_task: int#
memory_per_node: str | None#
time_limit: str | None#
nodelist: str | None#
partition: str | None#
class srunx.JobEnvironment(**data)[source]#

Bases: BaseModel

Job environment configuration.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

validate_environment()[source]#
Return type:

Self

conda: str | None#
venv: str | None#
container: ContainerResource | None#
env_vars: dict[str, str]#
class srunx.JobStatus(*values)[source]#

Bases: Enum

Job status enumeration for both SLURM jobs and workflow jobs.

UNKNOWN = 'UNKNOWN'#
PENDING = 'PENDING'#
RUNNING = 'RUNNING'#
COMPLETED = 'COMPLETED'#
FAILED = 'FAILED'#
CANCELLED = 'CANCELLED'#
TIMEOUT = 'TIMEOUT'#
class srunx.Workflow(name, jobs=None)[source]#

Bases: object

Represents a workflow containing multiple jobs with dependencies.

__init__(name, jobs=None)[source]#
add(job)[source]#
Return type:

None

get(name)[source]#

Get a job by name.

Return type:

Job | ShellJob | None

get_dependencies(job_name)[source]#

Get dependencies for a specific job.

Return type:

list[str]

remove(job)[source]#
Return type:

None

show()[source]#
validate()[source]#

Validate workflow job dependencies.

srunx.render_job_script(template_path, job, output_dir=None, verbose=False)[source]#

Render a SLURM job script from a template.

Parameters:
  • template_path (Path | str) – Path to the Jinja template file.

  • job (Job) – Job configuration.

  • output_dir (Path | str | None) – Directory where the generated script will be saved.

  • verbose (bool) – Whether to print the rendered content.

Return type:

str

Returns:

Path to the generated SLURM batch script.

Raises:
  • FileNotFoundError – If the template file does not exist.

  • jinja2.TemplateError – If template rendering fails.

class srunx.JobMonitor(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#

Bases: BaseMonitor

Monitor SLURM jobs until they reach terminal states.

Polls jobs at configured intervals and notifies callbacks on state transitions. Supports monitoring single or multiple jobs with target status detection.

__init__(job_ids, target_statuses=None, config=None, callbacks=None, client=None)[source]#

Initialize job monitor.

Parameters:
  • job_ids (list[int]) – List of SLURM job IDs to monitor.

  • target_statuses (list[JobStatus] | None) – Terminal statuses to wait for. Defaults to [COMPLETED, FAILED, CANCELLED, TIMEOUT].

  • config (MonitorConfig | None) – Monitoring configuration. Defaults to MonitorConfig() if None.

  • callbacks (list[Callback] | None) – List of notification callbacks. Defaults to empty list if None.

  • client (Slurm | None) – SLURM client instance. Defaults to Slurm() if None.

Raises:

ValueError – If job_ids is empty.

check_condition()[source]#

Check if all monitored jobs have reached target statuses.

Return type:

bool

Returns:

True if all jobs have reached a target status, False otherwise.

Raises:

SlurmError – If SLURM command fails.

get_current_state()[source]#

Get current state of all monitored jobs.

Return type:

dict[str, Any]

Returns:

Dictionary mapping job IDs (as strings) to their current statuses. Format: {str(job_id): status_value, …}

Raises:

SlurmError – If SLURM command fails.

class srunx.ResourceMonitor(min_gpus, partition=None, config=None, callbacks=None)[source]#

Bases: BaseMonitor

Monitor SLURM GPU resources until availability threshold is met.

Polls partition resources at configured intervals and notifies callbacks when resources become available or exhausted.

__init__(min_gpus, partition=None, config=None, callbacks=None)[source]#

Initialize resource monitor.

Parameters:
  • min_gpus (int) – Minimum number of GPUs required for threshold.

  • partition (str | None) – SLURM partition to monitor. Defaults to all partitions if None.

  • config (MonitorConfig | None) – Monitoring configuration. Defaults to MonitorConfig() if None.

  • callbacks (list[Callback] | None) – List of notification callbacks. Defaults to empty list if None.

Raises:

ValueError – If min_gpus < 0.

check_condition()[source]#

Check if resource availability threshold is met.

Return type:

bool

Returns:

True if available GPUs >= min_gpus threshold, False otherwise.

Raises:

SlurmError – If SLURM command fails.

get_current_state()[source]#

Get current resource state for comparison and logging.

Return type:

dict[str, Any]

Returns:

Dictionary with current resource state. Format: {

”partition”: str | None, “gpus_available”: int, “gpus_total”: int, “meets_threshold”: bool

}

Raises:

SlurmError – If SLURM command fails.

get_partition_resources()[source]#

Query SLURM for GPU resource availability.

Uses sinfo to get total GPUs per partition and squeue to get GPUs in use. Filters out DOWN/DRAIN/DRAINING nodes from availability calculation.

Return type:

ResourceSnapshot

Returns:

ResourceSnapshot with current resource state.

Raises:

SlurmError – If SLURM command fails.

class srunx.MonitorConfig(**data)[source]#

Bases: BaseModel

Configuration for monitoring operations.

class Config[source]#

Bases: object

json_schema_extra = {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}#
property is_aggressive: bool#

Check if polling interval is aggressive (<5 seconds).

model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'mode': 'until', 'notify_on_change': True, 'poll_interval': 60, 'timeout': 3600}, {'mode': 'continuous', 'notify_on_change': True, 'poll_interval': 5, 'timeout': None}]}}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

poll_interval: int#
timeout: int | None#
mode: WatchMode#
notify_on_change: bool#
class srunx.ResourceSnapshot(**data)[source]#

Bases: BaseModel

Point-in-time snapshot of SLURM partition resources.

class Config[source]#

Bases: object

json_schema_extra = {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}#
property gpu_utilization: float#

GPU utilization percentage (0.0 to 1.0).

property has_available_gpus: bool#

Check if any GPUs are available.

meets_threshold(min_gpus)[source]#

Check if available GPUs meet minimum threshold.

Parameters:

min_gpus (int) – Minimum required GPUs

Return type:

bool

Returns:

True if gpus_available >= min_gpus

model_config: ClassVar[ConfigDict] = {'json_schema_extra': {'examples': [{'gpus_available': 4, 'gpus_in_use': 12, 'jobs_running': 8, 'nodes_down': 1, 'nodes_idle': 2, 'nodes_total': 8, 'partition': 'gpu', 'timestamp': '2025-12-13T10:30:00', 'total_gpus': 16}]}}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

timestamp: datetime#
partition: str | None#
total_gpus: int#
gpus_in_use: int#
gpus_available: int#
jobs_running: int#
nodes_total: int#
nodes_idle: int#
nodes_down: int#
class srunx.WatchMode(*values)[source]#

Bases: StrEnum

Monitoring mode enumeration.

UNTIL_CONDITION = 'until'#

Monitor until condition is met, then exit

CONTINUOUS = 'continuous'#

Monitor indefinitely, notify on every state change

class srunx.WorkflowRunner(workflow, callbacks=None, args=None, default_project=None)[source]#

Bases: object

Runner for executing workflows defined in YAML with dynamic job scheduling.

Jobs are executed as soon as their dependencies are satisfied, rather than waiting for entire dependency levels to complete.

__init__(workflow, callbacks=None, args=None, default_project=None)[source]#

Initialize workflow runner.

Parameters:
  • workflow (Workflow) – Workflow to execute.

  • callbacks (Sequence[Callback] | None) – List of callbacks for job notifications.

  • args (dict[str, Any] | None) – Template variables from the YAML args section.

  • default_project (str | None) – Default project (mount name) for file syncing.

execute_from_yaml(yaml_path)[source]#

Load and execute a workflow from YAML file.

Parameters:

yaml_path (str | Path) – Path to YAML workflow file.

Return type:

dict[str, Job | ShellJob]

Returns:

Dictionary mapping job names to completed Job instances.

classmethod from_yaml(yaml_path, callbacks=None, single_job=None)[source]#

Load and validate a workflow from a YAML file.

Parameters:
  • yaml_path (str | Path) – Path to the YAML workflow definition file.

  • callbacks (Sequence[Callback] | None) – List of callbacks for job notifications.

  • single_job (str | None) – If specified, only load and process this job.

Return type:

Self

Returns:

WorkflowRunner instance with loaded workflow.

Raises:
  • FileNotFoundError – If the YAML file doesn’t exist.

  • yaml.YAMLError – If the YAML is malformed.

  • WorkflowValidationError – If the workflow structure is invalid.

get_independent_jobs()[source]#

Get all jobs that are independent of any other job.

Return type:

list[Job | ShellJob]

static parse_job(data)[source]#
Return type:

Job | ShellJob

run(from_job=None, to_job=None, single_job=None)[source]#

Run a workflow with dynamic job scheduling.

Jobs are executed as soon as their dependencies are satisfied.

Parameters:
  • from_job (str | None) – Start execution from this job (inclusive), ignoring dependencies

  • to_job (str | None) – Stop execution at this job (inclusive)

  • single_job (str | None) – Execute only this specific job, ignoring all dependencies

Return type:

dict[str, Job | ShellJob]

Returns:

Dictionary mapping job names to completed Job instances.

srunx.configure_logging(level='INFO', format_string=None, show_time=True, show_level=True, colorize=True)[source]#

Configure loguru logging for srunx.

Parameters:
  • level (str) – Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL).

  • format_string (str | None) – Custom format string. If None, uses default format.

  • show_time (bool) – Whether to show timestamp in logs.

  • show_level (bool) – Whether to show log level in logs.

  • colorize (bool) – Whether to colorize the output.

Return type:

None

srunx.configure_cli_logging(level='INFO', quiet=False)[source]#

Configure logging specifically for CLI usage.

Parameters:
  • level (str) – Logging level.

  • quiet (bool) – If True, only show WARNING and above.

Return type:

None

srunx.configure_workflow_logging(level='INFO')[source]#

Configure logging for workflow execution.

Parameters:

level (str) – Logging level.

Return type:

None

srunx.get_logger(name)[source]#

Get a logger instance for a module.

Parameters:

name (str) – Module name (usually __name__).

Return type:

Logger

Returns:

Logger instance.