Skip to content

API Reference

Sync Module

srunx.sync.rsync

Rsync-based file synchronization for remote SLURM servers.

logger module-attribute

logger = get_logger(__name__)

RsyncResult dataclass

RsyncResult(returncode: int, stdout: str, stderr: str)

Result of an rsync operation.

RsyncClient

RsyncClient(
    hostname: str,
    username: str,
    port: int = 22,
    key_filename: str | None = None,
    proxy_jump: str | None = None,
    ssh_config_path: str | None = None,
    exclude_patterns: Sequence[str] | None = None,
)

Rsync wrapper for syncing files to/from remote SLURM servers.

Handles SSH connection options (port, key, ProxyJump, ssh_config) and builds rsync commands with sensible defaults for development workflow synchronization.

Source code in src/srunx/sync/rsync.py
def __init__(
    self,
    hostname: str,
    username: str,
    port: int = 22,
    key_filename: str | None = None,
    proxy_jump: str | None = None,
    ssh_config_path: str | None = None,
    exclude_patterns: Sequence[str] | None = None,
) -> None:
    rsync_path = shutil.which("rsync")
    if rsync_path is None:
        raise RuntimeError(
            "rsync is not installed or not found in PATH. "
            "Please install rsync before using RsyncClient."
        )

    self.hostname = hostname
    self.username = username
    self.port = port
    self.key_filename = key_filename
    self.proxy_jump = proxy_jump
    self.ssh_config_path = ssh_config_path

    # Detect rsync capabilities
    self._supports_protect_args = False
    self._supports_mkpath = False
    self._detect_rsync_capabilities(rsync_path)

    # Merge caller-supplied excludes with defaults (no duplicates)
    self.exclude_patterns = list(self.DEFAULT_EXCLUDES)
    if exclude_patterns:
        seen = set(self.exclude_patterns)
        for pattern in exclude_patterns:
            if pattern not in seen:
                self.exclude_patterns.append(pattern)
                seen.add(pattern)

push

push(
    local_path: str | Path,
    remote_path: str | None = None,
    *,
    delete: bool = True,
    dry_run: bool = False,
    itemize: bool = False,
    verbose: bool = False,
    exclude_patterns: Sequence[str] | None = None,
) -> RsyncResult

Sync a local directory/file to the remote server.

Parameters:

Name Type Description Default
local_path str | Path

Local file or directory to push.

required
remote_path str | None

Destination path on the remote server. If None, uses get_default_remote_path().

None
delete bool

Remove remote files not present locally (default True).

True
dry_run bool

Perform a trial run with no changes made.

False
itemize bool

Add --itemize-changes so the result lists every file rsync would (or did) touch, with the standard YXcstpoguax flag prefix. Required for dry_run callers that want a human-readable preview.

False
verbose bool

Stream rsync's per-file progress to stderr live instead of capturing it silently. Adds --info=progress2 so users with large mounts see progress instead of a frozen terminal.

False
exclude_patterns Sequence[str] | None

Additional exclude patterns for this call only.

None

Returns:

Type Description
RsyncResult

RsyncResult with returncode, stdout, and stderr.

Source code in src/srunx/sync/rsync.py
def push(
    self,
    local_path: str | Path,
    remote_path: str | None = None,
    *,
    delete: bool = True,
    dry_run: bool = False,
    itemize: bool = False,
    verbose: bool = False,
    exclude_patterns: Sequence[str] | None = None,
) -> RsyncResult:
    """Sync a local directory/file to the remote server.

    Args:
        local_path: Local file or directory to push.
        remote_path: Destination path on the remote server.
            If None, uses ``get_default_remote_path()``.
        delete: Remove remote files not present locally (default True).
        dry_run: Perform a trial run with no changes made.
        itemize: Add ``--itemize-changes`` so the result lists every
            file rsync *would* (or did) touch, with the standard
            ``YXcstpoguax`` flag prefix. Required for ``dry_run``
            callers that want a human-readable preview.
        verbose: Stream rsync's per-file progress to stderr live
            instead of capturing it silently. Adds
            ``--info=progress2`` so users with large mounts see
            progress instead of a frozen terminal.
        exclude_patterns: Additional exclude patterns for this call only.

    Returns:
        RsyncResult with returncode, stdout, and stderr.
    """
    if remote_path is None:
        remote_path = self.get_default_remote_path(local_path)

    local = Path(local_path)
    src = str(local)
    # Trailing slash ensures rsync copies directory *contents*, not the
    # directory itself.
    if local.is_dir() and not src.endswith("/"):
        src += "/"

    dst = self._format_remote(remote_path)

    # Ensure remote directory exists when --mkpath is unavailable
    if not self._supports_mkpath and not dry_run:
        self._ensure_remote_dir(remote_path)

    excludes = self._merge_excludes(exclude_patterns)
    cmd = self._build_rsync_cmd(
        src,
        dst,
        delete=delete,
        dry_run=dry_run,
        itemize=itemize,
        verbose=verbose,
        excludes=excludes,
    )
    if verbose:
        return self._run_rsync_streaming(cmd)
    return self._run_rsync(cmd)

pull

pull(
    remote_path: str,
    local_path: str | Path,
    *,
    delete: bool = False,
    dry_run: bool = False,
    itemize: bool = False,
    exclude_patterns: Sequence[str] | None = None,
) -> RsyncResult

Sync a remote directory/file to the local machine.

Parameters:

Name Type Description Default
remote_path str

Source path on the remote server.

required
local_path str | Path

Local destination path.

required
delete bool

Remove local files not present on the remote (default False).

False
dry_run bool

Perform a trial run with no changes made.

False
itemize bool

Add --itemize-changes so the result enumerates every file rsync would (or did) touch.

False
exclude_patterns Sequence[str] | None

Additional exclude patterns for this call only.

None

Returns:

Type Description
RsyncResult

RsyncResult with returncode, stdout, and stderr.

Source code in src/srunx/sync/rsync.py
def pull(
    self,
    remote_path: str,
    local_path: str | Path,
    *,
    delete: bool = False,
    dry_run: bool = False,
    itemize: bool = False,
    exclude_patterns: Sequence[str] | None = None,
) -> RsyncResult:
    """Sync a remote directory/file to the local machine.

    Args:
        remote_path: Source path on the remote server.
        local_path: Local destination path.
        delete: Remove local files not present on the remote (default False).
        dry_run: Perform a trial run with no changes made.
        itemize: Add ``--itemize-changes`` so the result enumerates
            every file rsync *would* (or did) touch.
        exclude_patterns: Additional exclude patterns for this call only.

    Returns:
        RsyncResult with returncode, stdout, and stderr.
    """
    src = self._format_remote(remote_path)
    dst = str(local_path)

    excludes = self._merge_excludes(exclude_patterns)
    cmd = self._build_rsync_cmd(
        src,
        dst,
        delete=delete,
        dry_run=dry_run,
        itemize=itemize,
        excludes=excludes,
    )
    return self._run_rsync(cmd)

read_remote_file

read_remote_file(remote_path: str) -> str | None

Return the remote file's contents, or None if it doesn't exist.

Used by the per-machine ownership marker (#137 part 4) to read .srunx-owner.json before each sync. The check needs to distinguish "file missing" (legitimate first sync, returns None) from "ssh / network failed" (raise so the caller knows the marker can't be trusted).

Implementation: ssh ... cat -- <path> with a per-file existence test wrapped in a single shell command — keeps the round-trip count to one per check.

Source code in src/srunx/sync/rsync.py
def read_remote_file(self, remote_path: str) -> str | None:
    """Return the remote file's contents, or ``None`` if it doesn't exist.

    Used by the per-machine ownership marker (#137 part 4) to read
    ``.srunx-owner.json`` before each sync. The check needs to
    distinguish "file missing" (legitimate first sync, returns
    ``None``) from "ssh / network failed" (raise so the caller
    knows the marker can't be trusted).

    Implementation: ``ssh ... cat -- <path>`` with a per-file
    existence test wrapped in a single shell command — keeps the
    round-trip count to one per check.
    """
    # ``test -f X && cat X`` returns:
    #   * 0 + stdout: file exists, content returned
    #   * 1 + empty stdout: file does not exist
    #   * 2+ : actual error (permission denied, ssh failure, …)
    # We disambiguate via the exit code so transient failures
    # don't get silently treated as "no marker".
    quoted = shlex.quote(remote_path)
    result = self._ssh_run(f"test -f {quoted} && cat -- {quoted}")
    if result.returncode == 0:
        return result.stdout
    if result.returncode == 1:
        # ``test -f`` returned false — file does not exist.
        return None
    raise RuntimeError(
        f"ssh read of {remote_path!r} failed (exit {result.returncode}): "
        f"{result.stderr.strip()}"
    )

write_remote_file

write_remote_file(remote_path: str, content: str) -> None

Atomically write content to remote_path via ssh + tee.

tee (without -a) truncates and rewrites the destination in one shell op so a concurrent reader either sees the old content or the new content — never a half-written file. The parent directory is assumed to exist (the rsync that just ran guarantees it for the owner-marker case).

Raises :class:RuntimeError if ssh / tee exits non-zero so the caller can surface the failure rather than silently leaving a stale marker on disk.

Source code in src/srunx/sync/rsync.py
def write_remote_file(self, remote_path: str, content: str) -> None:
    """Atomically write *content* to *remote_path* via ssh + tee.

    ``tee`` (without ``-a``) truncates and rewrites the destination
    in one shell op so a concurrent reader either sees the old
    content or the new content — never a half-written file. The
    parent directory is assumed to exist (the rsync that just ran
    guarantees it for the owner-marker case).

    Raises :class:`RuntimeError` if ssh / tee exits non-zero so the
    caller can surface the failure rather than silently leaving a
    stale marker on disk.
    """
    quoted = shlex.quote(remote_path)
    # Use ``> /dev/null`` so tee's stdout doesn't echo the JSON
    # back through ssh (waste of bytes + stdout pollution).
    result = self._ssh_run(f"tee -- {quoted} > /dev/null", stdin=content)
    if result.returncode != 0:
        raise RuntimeError(
            f"ssh write to {remote_path!r} failed "
            f"(exit {result.returncode}): {result.stderr.strip()}"
        )

remote_sha256

remote_sha256(remote_path: str) -> str | None

Return the SHA-256 hex digest of remote_path, or None.

Used by :func:srunx.sync.hash_verify.verify_paths_match (#137 part 5) to detect the silent-rsync-failure case where rsync exits 0 but the specific file we're about to sbatch never reached the cluster (excluded by a stray rule, lost to a path-translation bug, …). A None return defers to the caller, which decides whether "missing" or "no tool" should block submission.

Returns:

Type Description
str | None

The 64-char lowercase hex digest on success.

str | None

None when the file does not exist on the remote.

str | None

None when neither sha256sum nor shasum -a 256

str | None

is available on the remote PATH (logged at debug — the

str | None

rsync that just succeeded is the user's main signal).

Raises:

Type Description
RuntimeError

For any other ssh / network failure (connection refused, host key mismatch, host unreachable, …). Callers that want "best effort" can catch and downgrade; the marker-read code in :func:check_owner is the prior art for that pattern.

Source code in src/srunx/sync/rsync.py
def remote_sha256(self, remote_path: str) -> str | None:
    """Return the SHA-256 hex digest of *remote_path*, or ``None``.

    Used by :func:`srunx.sync.hash_verify.verify_paths_match`
    (#137 part 5) to detect the silent-rsync-failure case where
    rsync exits 0 but the specific file we're about to ``sbatch``
    never reached the cluster (excluded by a stray rule, lost to
    a path-translation bug, …). A None return defers to the
    caller, which decides whether "missing" or "no tool" should
    block submission.

    Returns:
        The 64-char lowercase hex digest on success.
        ``None`` when the file does not exist on the remote.
        ``None`` when neither ``sha256sum`` nor ``shasum -a 256``
        is available on the remote PATH (logged at debug — the
        rsync that just succeeded is the user's main signal).

    Raises:
        RuntimeError: For any other ssh / network failure (connection
            refused, host key mismatch, host unreachable, …). Callers
            that want "best effort" can catch and downgrade; the
            marker-read code in :func:`check_owner` is the prior art
            for that pattern.
    """
    quoted = shlex.quote(remote_path)
    # Single round-trip: existence check, then prefer sha256sum
    # (Linux), fall back to shasum -a 256 (macOS / BSD). Custom
    # exit codes disambiguate "file missing" and "no tool" from
    # genuine failures so the Python side doesn't have to grep
    # stderr to make that distinction.
    script = (
        f"test -f {quoted} || exit {self._SHA256_REMOTE_MISSING_EXIT}\n"
        f"if command -v sha256sum >/dev/null 2>&1; then\n"
        f"  sha256sum -- {quoted}\n"
        f"elif command -v shasum >/dev/null 2>&1; then\n"
        f"  shasum -a 256 -- {quoted}\n"
        f"else\n"
        f"  exit {self._SHA256_REMOTE_NO_TOOL_EXIT}\n"
        f"fi"
    )
    result = self._ssh_run(script)
    if result.returncode == 0:
        match = self._SHA256_HEX_RE.match(result.stdout.strip())
        if match is None:
            # Unparseable output is a genuine failure — sha256sum
            # / shasum surfaced something we don't understand,
            # better to fail loud than silently fall through to
            # "no hash".
            raise RuntimeError(
                f"could not parse sha256 output for {remote_path!r}: "
                f"{result.stdout.strip()!r}"
            )
        return match.group(1).lower()
    if result.returncode == self._SHA256_REMOTE_MISSING_EXIT:
        return None
    if result.returncode == self._SHA256_REMOTE_NO_TOOL_EXIT:
        logger.debug(
            "Remote sha256 verification skipped for {}: "
            "neither sha256sum nor shasum available on remote PATH",
            remote_path,
        )
        return None
    raise RuntimeError(
        f"ssh sha256 of {remote_path!r} failed "
        f"(exit {result.returncode}): {result.stderr.strip()}"
    )

get_default_remote_path staticmethod

get_default_remote_path(
    local_path: str | Path | None = None,
) -> str

Derive a default remote workspace path from the git repo or cwd.

Parameters:

Name Type Description Default
local_path str | Path | None

Optional local directory to derive the project name from. If None, uses the current working directory.

None

Returns:

Type Description
str

A path like ~/.config/srunx/workspace/<project_name>/.

Source code in src/srunx/sync/rsync.py
@staticmethod
def get_default_remote_path(local_path: str | Path | None = None) -> str:
    """Derive a default remote workspace path from the git repo or cwd.

    Args:
        local_path: Optional local directory to derive the project name
            from. If None, uses the current working directory.

    Returns:
        A path like ``~/.config/srunx/workspace/<project_name>/``.
    """
    cwd = str(Path(local_path)) if local_path else None
    try:
        result = subprocess.run(  # noqa: S603, S607
            ["git", "rev-parse", "--show-toplevel"],
            capture_output=True,
            text=True,
            cwd=cwd,
        )
        if result.returncode == 0:
            basename = Path(result.stdout.strip()).name
        else:
            basename = Path(cwd).name if cwd else Path.cwd().name
    except FileNotFoundError:
        # git not installed
        basename = Path(cwd).name if cwd else Path.cwd().name

    return f"~/.config/srunx/workspace/{basename}/"

get_logger

get_logger(name: str) -> Logger

Get a logger instance for a module.

Parameters:

Name Type Description Default
name str

Module name (usually name).

required

Returns:

Type Description
Logger

Logger instance.

Source code in src/srunx/common/logging.py
def get_logger(name: str) -> Logger:
    """Get a logger instance for a module.

    Args:
        name: Module name (usually __name__).

    Returns:
        Logger instance.
    """
    return logger.bind(name=name)  # type: ignore

Sweep orchestration

srunx.runtime.sweep.expand

Pure functions for matrix expansion, sweep-spec merging, and CLI flag parsing.

All validation routes through WorkflowValidationError so CLI, Web API, and MCP paths surface a consistent error category.

expand_matrix

expand_matrix(
    matrix: dict[str, list[Any]], base_args: dict[str, Any]
) -> list[dict[str, Any]]

Cross-product of matrix axes merged into base_args.

Axis iteration order follows insertion order of matrix. Matrix values override any identically-keyed entry in base_args (matrix wins at the args level; the deps.<parent>.<key> channel remains a separate space).

Raises:

Type Description
WorkflowValidationError

empty matrix (R2.10), empty axis list (R2.4), non-scalar axis value (R2.5), axis named deps (R2.3), or cell_count > 1000 (R2.8).

merge_sweep_specs

merge_sweep_specs(
    yaml_sweep: SweepSpec | None,
    cli_sweep_axes: dict[str, list[ScalarValue]],
    cli_arg_overrides: dict[str, str],
    cli_fail_fast: bool | None,
    cli_max_parallel: int | None,
) -> SweepSpec | None

Merge YAML sweep: block with CLI flags at axis granularity.

  • If neither YAML nor CLI provides any matrix axis, returns None (caller runs the non-sweep path).
  • CLI axes replace same-named YAML axes; CLI-only axes are added.
  • --arg KEY colliding with --sweep KEY is rejected (R3.6).
  • The final max_parallel must be set and >= 1 (R2.6).

Raises:

Type Description
WorkflowValidationError

on --arg/--sweep key collision or missing/invalid final max_parallel.

parse_arg_flags

parse_arg_flags(raw: list[str]) -> dict[str, str]

Tokenize --arg KEY=VALUE occurrences.

Rules: - Split on the FIRST = (later = characters stay in the value). - Duplicate keys: last occurrence wins (R1.2). - Missing = raises WorkflowValidationError (R3.8). - Values are always strings; no int/float/bool auto-cast (R3.10).

parse_sweep_flags

parse_sweep_flags(raw: list[str]) -> dict[str, list[str]]

Tokenize --sweep KEY=v1,v2,v3 occurrences.

  • Split axis at the first = (axis names cannot contain =).
  • Values are split on , with no escape handling (Phase 1, R3.5).
  • Empty elements (a,,b) are preserved as empty strings (R3.9).
  • Missing = raises WorkflowValidationError (R3.8).
  • Duplicate axis: last occurrence wins (consistent with parse_arg_flags).

srunx.runtime.sweep.orchestrator

SweepOrchestrator: materialize matrix cells and drive them under a semaphore.

See .claude/specs/workflow-parameter-sweep/design.md § SweepOrchestrator and tasks 17-20.

SweepOrchestrator

SweepOrchestrator(
    *,
    workflow_yaml_path: Path | None,
    workflow_data: dict[str, Any],
    args_override: dict[str, Any] | None,
    sweep_spec: SweepSpec,
    submission_source: Literal["cli", "web", "mcp"],
    callbacks: Sequence[Callback] | None = None,
    endpoint_id: int | None = None,
    preset: str = "terminal",
    executor_factory: WorkflowJobExecutorFactory
    | None = None,
    submission_context: SubmissionRenderContext
    | None = None,
)

Drive sweep execution: materialize cells, run them, aggregate status.

run

run() -> SweepRun

Execute the sweep synchronously and return the final SweepRun.

materialize

materialize() -> int

Expand + materialize cells synchronously; return sweep_run_id.

Separated from :meth:arun so HTTP callers can materialize inside the request (to obtain sweep_run_id) and then spawn :meth:arun_from_materialized as a background task.

arun_from_materialized async

arun_from_materialized(sweep_run_id: int) -> SweepRun

Run the execution loop for an already-materialized sweep.

Assumes :meth:materialize (or equivalent) populated self._cells and self._sweep_run_id. Used by both :meth:arun (materialize + run in the same call) and the Web dispatcher (materialize synchronously, then spawn this as a background task).

arun async

arun() -> SweepRun

Execute the sweep with bounded concurrency.

Steps: expand → materialize → spawn N cells behind an anyio.Semaphore(min(max_parallel, cell_count)) → return the final SweepRun row.

resume_from_db async

resume_from_db(
    sweep_run_id: int, pending_cells: list[CellSpec]
) -> SweepRun

Resume a sweep whose cells were already materialized.

Used by :class:srunx.runtime.sweep.reconciler.SweepReconciler to spawn orchestrator tasks after a crash. Skips expand + materialize; the caller provides the already-materialized pending cells.

request_cancel

request_cancel() -> None

Mark the sweep as cancel-requested and drain pending cells.

Idempotent: a second call is a no-op because _cancelled is set and SweepRunRepository.request_cancel guards on cancel_requested_at IS NULL.

get_active_orchestrator

get_active_orchestrator(
    sweep_run_id: int,
) -> SweepOrchestrator | None

Return the live orchestrator for sweep_run_id or None.

drain_sweep_pending_cells

drain_sweep_pending_cells(sweep_run_id: int) -> int

Cancel every still-pending cell for sweep_run_id and sync counters.

Runs in a single IMMEDIATE TX on a fresh connection. After the drain it triggers the aggregator so the sweep can transition to its final status if every in-flight cell is already done. Returns the number of cells moved from pending to cancelled (0 when nothing was pending).

This is the out-of-process drain used by the cancel endpoint when no in-process orchestrator is registered (crash-recovery path) and by :class:SweepOrchestrator itself via :meth:SweepOrchestrator._drain.

Security helpers

srunx.runtime.security.python_args

Reject python: prefix in user-supplied args / matrix values.

Runs on Web API submission (YAML + JSON) and MCP tool calls. The python: prefix in args is a server-side evaluation escape hatch reserved for CLI-local use; exposing it over transport boundaries is a security concern (remote code execution via workflow mutation). See :func:srunx.runtime.workflow.loader._has_python_prefix for the CLI-side parser that actually evaluates these values — the check here mirrors its matching rules (prefix match, leading-whitespace-tolerant, case-insensitive).

PythonPrefixViolation dataclass

PythonPrefixViolation(source: str, path: str, value: str)

Structured violation record. Caller converts to transport-specific error.

source instance-attribute

source: str

Logical origin of the payload (e.g. "args", "sweep.matrix").

path instance-attribute

path: str

Dotted / indexed path to the offending value (e.g. "x[2]", "lr").

value instance-attribute

value: str

The offending value, reproduced for error messaging.

find_python_prefix

find_python_prefix(
    payload: Any, *, source: str, _path: str = ""
) -> PythonPrefixViolation | None

Recursively scan a dict / list / scalar payload; return the first violation.

Traverses nested dict -> list -> str. Non-string scalars (int / float / bool / None) are ignored. Returns None when no violation is found.

srunx.runtime.security.mount_paths

Mount-root guard for ShellJob script paths.

Both the Web API and the MCP tool surface the same attack shape: a workflow YAML can declare template: shell with an arbitrary script_path that render_shell_job_script then reads verbatim. Without a guard, a caller could exfiltrate or inject arbitrary host files via e.g. script_path: ../../../etc/passwd.

The helper here returns a structured ShellJobScriptViolation so each transport caller can raise the right exception type (Web → HTTPException(403), MCP → ValueError) while the actual directory-check logic lives in one place.

ShellJobScriptViolation dataclass

ShellJobScriptViolation(job_name: str, script_path: str)

First ShellJob whose script path escapes the allowed mount roots.

find_shell_script_violation

find_shell_script_violation(
    workflow: Workflow, mount_roots: Iterable[Path]
) -> ShellJobScriptViolation | None

Return the first ShellJob pointing outside every mount_roots entry.

mount_roots must already be resolved absolute paths (see :meth:pathlib.Path.resolve). Non-ShellJob entries are ignored. Returns None when every ShellJob's script_path is contained in at least one root.

SLURM protocol constants

srunx.slurm.states

SLURM protocol-level state constants.

Separate from :class:srunx.domain.JobStatus (the domain-level enum) because SLURM's raw state vocabulary is wider than what srunx models: SLURM emits NODE_FAIL / PREEMPTED / OUT_OF_MEMORY for terminal failures that srunx currently collapses into FAILED at the domain boundary. Keeping these strings in a single module lets every caller that speaks SLURM-native states (notification preset filter, active-watch poller, web SSH adapter) agree on the set without risking drift when a new terminal state is added.