Source code for srunx.web.sync_utils

"""Shared rsync sync utilities for web routers."""

from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from srunx.ssh.core.config import ServerProfile

from srunx.sync.rsync import RsyncClient


[docs] def get_current_profile() -> ServerProfile | None: """Get the current SSH profile from web config or ConfigManager. Checks ``SRUNX_SSH_PROFILE`` (via :func:`get_web_config`) first, then falls back to :meth:`ConfigManager.get_current_profile_name`. Returns ``None`` if no profile is configured. """ from srunx.ssh.core.config import ConfigManager from .config import get_web_config config = get_web_config() cm = ConfigManager() profile_name = config.ssh_profile if not profile_name: profile_name = cm.get_current_profile_name() if not profile_name: return None return cm.get_profile(profile_name)
[docs] def build_rsync_client(profile: ServerProfile) -> RsyncClient: """Create RsyncClient from SSH profile, handling ssh_host vs hostname. When *ssh_host* is set the client delegates all connection parameters (user, key, proxy, port) to ``~/.ssh/config``. """ if profile.ssh_host: return RsyncClient( hostname=profile.ssh_host, username="", ssh_config_path=str(Path.home() / ".ssh" / "config"), ) return RsyncClient( hostname=profile.hostname, username=profile.username, key_filename=profile.key_filename, port=profile.port, proxy_jump=profile.proxy_jump, )
[docs] def sync_mount_by_name(profile: ServerProfile, mount_name: str) -> None: """Sync a named mount's local directory to remote via rsync. Raises: ValueError: If *mount_name* does not exist in the profile. RuntimeError: If the rsync process exits with a non-zero code. """ mount = next((m for m in profile.mounts if m.name == mount_name), None) if mount is None: raise ValueError(f"Mount '{mount_name}' not found in profile") rsync = build_rsync_client(profile) result = rsync.push( mount.local, mount.remote, exclude_patterns=mount.exclude_patterns or None ) if result.returncode != 0: raise RuntimeError( f"rsync sync failed for mount '{mount_name}': {result.stderr}" )
[docs] def resolve_mounts_for_workflow( profile: ServerProfile, jobs_data: list[dict], default_project: str | None = None, ) -> list[str]: """Identify mount names to sync for a workflow's jobs. Matches each job's ``work_dir`` against mount remote paths using longest-prefix matching. Also includes *default_project* if it corresponds to a valid mount. Returns: Deduplicated list of mount names. """ mount_names: set[str] = set() if default_project: if any(m.name == default_project for m in profile.mounts): mount_names.add(default_project) for jd in jobs_data: work_dir = jd.get("work_dir", "") if not work_dir: continue # Find longest prefix match among mounts best_mount: str | None = None best_len = 0 for m in profile.mounts: remote = m.remote.rstrip("/") if work_dir == remote or work_dir.startswith(remote + "/"): if len(remote) > best_len: best_mount = m.name best_len = len(remote) if best_mount: mount_names.add(best_mount) return list(mount_names)