diff --git a/src/pkgmgr/actions/install/installers/nix/conflicts.py b/src/pkgmgr/actions/install/installers/nix/conflicts.py new file mode 100644 index 0000000..a7c3486 --- /dev/null +++ b/src/pkgmgr/actions/install/installers/nix/conflicts.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, List + +from .profile import NixProfileInspector +from .retry import GitHubRateLimitRetry +from .runner import CommandRunner +from .textparse import NixConflictTextParser + +if TYPE_CHECKING: + from pkgmgr.actions.install.context import RepoContext + + +class NixConflictResolver: + """ + Resolves nix profile file conflicts by: + 1. Parsing conflicting store paths from stderr + 2. Mapping them to profile remove tokens via `nix profile list --json` + 3. Removing those tokens deterministically + 4. Retrying install + """ + + def __init__( + self, + runner: CommandRunner, + retry: GitHubRateLimitRetry, + profile: NixProfileInspector, + ) -> None: + self._runner = runner + self._retry = retry + self._profile = profile + self._parser = NixConflictTextParser() + + def resolve( + self, + ctx: "RepoContext", + install_cmd: str, + stdout: str, + stderr: str, + *, + output: str, + max_rounds: int = 10, + ) -> bool: + quiet = bool(getattr(ctx, "quiet", False)) + combined = f"{stdout}\n{stderr}" + + for _ in range(max_rounds): + # 1) Extract conflicting store prefixes from nix error output + store_prefixes = self._parser.existing_store_prefixes(combined) + + # 2) Resolve them to concrete remove tokens + tokens: List[str] = self._profile.find_remove_tokens_for_store_prefixes( + ctx, + self._runner, + store_prefixes, + ) + + # 3) Fallback: output-name based lookup (also covers nix suggesting: `nix profile remove pkgmgr`) + if not tokens: + tokens = self._profile.find_remove_tokens_for_output(ctx, self._runner, output) + + if tokens: + if not quiet: + print( + "[nix] conflict detected; removing existing profile entries: " + + ", ".join(tokens) + ) + + for t in tokens: + # tokens may contain things like "pkgmgr" or "pkgmgr-1" or quoted tokens (we keep raw) + self._runner.run(ctx, f"nix profile remove {t}", allow_failure=True) + + res = self._retry.run_with_retry(ctx, self._runner, install_cmd) + if res.returncode == 0: + return True + + combined = f"{res.stdout}\n{res.stderr}" + continue + + # 4) Last-resort fallback: use textual remove tokens from stderr (“nix profile remove X”) + tokens = self._parser.remove_tokens(combined) + if tokens: + if not quiet: + print("[nix] fallback remove tokens: " + ", ".join(tokens)) + + for t in tokens: + self._runner.run(ctx, f"nix profile remove {t}", allow_failure=True) + + res = self._retry.run_with_retry(ctx, self._runner, install_cmd) + if res.returncode == 0: + return True + + combined = f"{res.stdout}\n{res.stderr}" + continue + + if not quiet: + print("[nix] conflict detected but could not resolve profile entries to remove.") + return False + + return False diff --git a/src/pkgmgr/actions/install/installers/nix/installer.py b/src/pkgmgr/actions/install/installers/nix/installer.py index 6b83275..36c4174 100644 --- a/src/pkgmgr/actions/install/installers/nix/installer.py +++ b/src/pkgmgr/actions/install/installers/nix/installer.py @@ -1,12 +1,12 @@ -# src/pkgmgr/actions/install/installers/nix/installer.py from __future__ import annotations import os import shutil -from typing import List, Tuple, TYPE_CHECKING +from typing import TYPE_CHECKING, List, Tuple from pkgmgr.actions.install.installers.base import BaseInstaller +from .conflicts import NixConflictResolver from .profile import NixProfileInspector from .retry import GitHubRateLimitRetry, RetryPolicy from .runner import CommandRunner @@ -14,6 +14,7 @@ from .runner import CommandRunner if TYPE_CHECKING: from pkgmgr.actions.install.context import RepoContext + class NixFlakeInstaller(BaseInstaller): layer = "nix" FLAKE_FILE = "flake.nix" @@ -22,15 +23,18 @@ class NixFlakeInstaller(BaseInstaller): self._runner = CommandRunner() self._retry = GitHubRateLimitRetry(policy=policy) self._profile = NixProfileInspector() + self._conflicts = NixConflictResolver(self._runner, self._retry, self._profile) - # ------------------------------------------------------------------ # - # Compatibility: supports() - # ------------------------------------------------------------------ # + # Newer nix rejects numeric indices; we learn this at runtime and cache the decision. + self._indices_supported: bool | None = None def supports(self, ctx: "RepoContext") -> bool: if os.environ.get("PKGMGR_DISABLE_NIX_FLAKE_INSTALLER") == "1": if not ctx.quiet: - print("[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 – skipping NixFlakeInstaller.") + print( + "[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 – " + "skipping NixFlakeInstaller." + ) return False if shutil.which("nix") is None: @@ -38,20 +42,12 @@ class NixFlakeInstaller(BaseInstaller): return os.path.exists(os.path.join(ctx.repo_dir, self.FLAKE_FILE)) - # ------------------------------------------------------------------ # - # Compatibility: output selection - # ------------------------------------------------------------------ # - def _profile_outputs(self, ctx: "RepoContext") -> List[Tuple[str, bool]]: # (output_name, allow_failure) if ctx.identifier in {"pkgmgr", "package-manager"}: return [("pkgmgr", False), ("default", True)] return [("default", False)] - # ------------------------------------------------------------------ # - # Compatibility: run() - # ------------------------------------------------------------------ # - def run(self, ctx: "RepoContext") -> None: if not self.supports(ctx): return @@ -59,11 +55,12 @@ class NixFlakeInstaller(BaseInstaller): outputs = self._profile_outputs(ctx) if not ctx.quiet: - print( + msg = ( "[nix] flake detected in " f"{ctx.identifier}, ensuring outputs: " + ", ".join(name for name, _ in outputs) ) + print(msg) for output, allow_failure in outputs: if ctx.force_update: @@ -71,13 +68,13 @@ class NixFlakeInstaller(BaseInstaller): else: self._install_only(ctx, output, allow_failure) - # ------------------------------------------------------------------ # - # Core logic (unchanged semantics) - # ------------------------------------------------------------------ # - def _installable(self, ctx: "RepoContext", output: str) -> str: return f"{ctx.repo_dir}#{output}" + # --------------------------------------------------------------------- + # Core install path + # --------------------------------------------------------------------- + def _install_only(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None: install_cmd = f"nix profile install {self._installable(ctx, output)}" @@ -85,35 +82,56 @@ class NixFlakeInstaller(BaseInstaller): print(f"[nix] install: {install_cmd}") res = self._retry.run_with_retry(ctx, self._runner, install_cmd) - if res.returncode == 0: if not ctx.quiet: print(f"[nix] output '{output}' successfully installed.") return + # Conflict resolver first (handles the common “existing package already provides file” case) + if self._conflicts.resolve( + ctx, + install_cmd, + res.stdout, + res.stderr, + output=output, + ): + if not ctx.quiet: + print(f"[nix] output '{output}' successfully installed after conflict cleanup.") + return + if not ctx.quiet: print( f"[nix] install failed for '{output}' (exit {res.returncode}), " - "trying index-based upgrade/remove+install..." + "trying upgrade/remove+install..." ) - indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output) + # If indices are supported, try legacy index-upgrade path. + if self._indices_supported is not False: + indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output) - upgraded = False - for idx in indices: - if self._upgrade_index(ctx, idx): - upgraded = True - if not ctx.quiet: - print(f"[nix] output '{output}' successfully upgraded (index {idx}).") + upgraded = False + for idx in indices: + if self._upgrade_index(ctx, idx): + upgraded = True + if not ctx.quiet: + print(f"[nix] output '{output}' successfully upgraded (index {idx}).") - if upgraded: - return + if upgraded: + return - if indices and not ctx.quiet: - print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.") + if indices and not ctx.quiet: + print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.") - for idx in indices: - self._remove_index(ctx, idx) + for idx in indices: + self._remove_index(ctx, idx) + + # If we learned indices are unsupported, immediately fall back below + if self._indices_supported is False: + self._remove_tokens_for_output(ctx, output) + + else: + # indices explicitly unsupported + self._remove_tokens_for_output(ctx, output) final = self._runner.run(ctx, install_cmd, allow_failure=True) if final.returncode == 0: @@ -122,17 +140,24 @@ class NixFlakeInstaller(BaseInstaller): return print(f"[ERROR] Failed to install Nix flake output '{output}' (exit {final.returncode})") - if not allow_failure: raise SystemExit(final.returncode) print(f"[WARNING] Continuing despite failure of optional output '{output}'.") - # ------------------------------------------------------------------ # - # force_update path (unchanged semantics) - # ------------------------------------------------------------------ # + # --------------------------------------------------------------------- + # force_update path + # --------------------------------------------------------------------- def _force_upgrade_output(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None: + # Prefer token path if indices unsupported (new nix) + if self._indices_supported is False: + self._remove_tokens_for_output(ctx, output) + self._install_only(ctx, output, allow_failure) + if not ctx.quiet: + print(f"[nix] output '{output}' successfully upgraded.") + return + indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output) upgraded_any = False @@ -143,7 +168,8 @@ class NixFlakeInstaller(BaseInstaller): print(f"[nix] output '{output}' successfully upgraded (index {idx}).") if upgraded_any: - print(f"[nix] output '{output}' successfully upgraded.") + if not ctx.quiet: + print(f"[nix] output '{output}' successfully upgraded.") return if indices and not ctx.quiet: @@ -152,17 +178,52 @@ class NixFlakeInstaller(BaseInstaller): for idx in indices: self._remove_index(ctx, idx) + # If we learned indices are unsupported, also remove by token to actually clear conflicts + if self._indices_supported is False: + self._remove_tokens_for_output(ctx, output) + self._install_only(ctx, output, allow_failure) - print(f"[nix] output '{output}' successfully upgraded.") + if not ctx.quiet: + print(f"[nix] output '{output}' successfully upgraded.") - # ------------------------------------------------------------------ # + # --------------------------------------------------------------------- # Helpers - # ------------------------------------------------------------------ # + # --------------------------------------------------------------------- + + def _stderr_says_indices_unsupported(self, stderr: str) -> bool: + s = (stderr or "").lower() + return "no longer supports indices" in s or "does not support indices" in s def _upgrade_index(self, ctx: "RepoContext", idx: int) -> bool: - res = self._runner.run(ctx, f"nix profile upgrade --refresh {idx}", allow_failure=True) + cmd = f"nix profile upgrade --refresh {idx}" + res = self._runner.run(ctx, cmd, allow_failure=True) + + if self._stderr_says_indices_unsupported(getattr(res, "stderr", "")): + self._indices_supported = False + return False + + if self._indices_supported is None: + self._indices_supported = True + return res.returncode == 0 def _remove_index(self, ctx: "RepoContext", idx: int) -> None: - self._runner.run(ctx, f"nix profile remove {idx}", allow_failure=True) + res = self._runner.run(ctx, f"nix profile remove {idx}", allow_failure=True) + + if self._stderr_says_indices_unsupported(getattr(res, "stderr", "")): + self._indices_supported = False + + if self._indices_supported is None: + self._indices_supported = True + + def _remove_tokens_for_output(self, ctx: "RepoContext", output: str) -> None: + tokens = self._profile.find_remove_tokens_for_output(ctx, self._runner, output) + if not tokens: + return + + if not ctx.quiet: + print(f"[nix] indices unsupported; removing by token(s): {', '.join(tokens)}") + + for t in tokens: + self._runner.run(ctx, f"nix profile remove {t}", allow_failure=True) diff --git a/src/pkgmgr/actions/install/installers/nix/profile.py b/src/pkgmgr/actions/install/installers/nix/profile.py deleted file mode 100644 index c733909..0000000 --- a/src/pkgmgr/actions/install/installers/nix/profile.py +++ /dev/null @@ -1,71 +0,0 @@ -from __future__ import annotations - -import json -from typing import Any, List, TYPE_CHECKING - - -if TYPE_CHECKING: - from pkgmgr.actions.install.context import RepoContext - from .runner import CommandRunner - -class NixProfileInspector: - """ - Reads and interprets `nix profile list --json` and provides helpers for - finding indices matching a given output name. - """ - - def find_installed_indices_for_output(self, ctx: "RepoContext", runner: "CommandRunner", output: str) -> List[int]: - res = runner.run(ctx, "nix profile list --json", allow_failure=True) - if res.returncode != 0: - return [] - - try: - data = json.loads(res.stdout or "{}") - except json.JSONDecodeError: - return [] - - indices: List[int] = [] - - elements = data.get("elements") - if isinstance(elements, dict): - for idx_str, elem in elements.items(): - try: - idx = int(idx_str) - except (TypeError, ValueError): - continue - if self._element_matches_output(elem, output): - indices.append(idx) - return sorted(indices) - - if isinstance(elements, list): - for elem in elements: - idx = elem.get("index") if isinstance(elem, dict) else None - if isinstance(idx, int) and self._element_matches_output(elem, output): - indices.append(idx) - return sorted(indices) - - return [] - - @staticmethod - def element_matches_output(elem: Any, output: str) -> bool: - return NixProfileInspector._element_matches_output(elem, output) - - @staticmethod - def _element_matches_output(elem: Any, output: str) -> bool: - out = (output or "").strip() - if not out or not isinstance(elem, dict): - return False - - candidates: List[str] = [] - for k in ("attrPath", "originalUrl", "url", "storePath", "name"): - v = elem.get(k) - if isinstance(v, str) and v: - candidates.append(v) - - for c in candidates: - if c == out: - return True - if f"#{out}" in c: - return True - - return False diff --git a/src/pkgmgr/actions/install/installers/nix/profile/__init__.py b/src/pkgmgr/actions/install/installers/nix/profile/__init__.py new file mode 100644 index 0000000..1b221f3 --- /dev/null +++ b/src/pkgmgr/actions/install/installers/nix/profile/__init__.py @@ -0,0 +1,4 @@ +from .inspector import NixProfileInspector +from .models import NixProfileEntry + +__all__ = ["NixProfileInspector", "NixProfileEntry"] diff --git a/src/pkgmgr/actions/install/installers/nix/profile/inspector.py b/src/pkgmgr/actions/install/installers/nix/profile/inspector.py new file mode 100644 index 0000000..6daf83a --- /dev/null +++ b/src/pkgmgr/actions/install/installers/nix/profile/inspector.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +from typing import Any, List, TYPE_CHECKING + +from .matcher import ( + entry_matches_output, + entry_matches_store_path, + stable_unique_ints, +) +from .normalizer import normalize_elements +from .parser import parse_profile_list_json +from .result import extract_stdout_text + +if TYPE_CHECKING: + # Keep these as TYPE_CHECKING-only to avoid runtime import cycles. + from pkgmgr.actions.install.context import RepoContext + from pkgmgr.core.command.runner import CommandRunner + + +class NixProfileInspector: + """ + Reads and inspects the user's Nix profile list (JSON). + + Public API: + - list_json() + - find_installed_indices_for_output() (legacy; may not work on newer nix) + - find_indices_by_store_path() (legacy; may not work on newer nix) + - find_remove_tokens_for_output() + - find_remove_tokens_for_store_prefixes() + """ + + def list_json(self, ctx: "RepoContext", runner: "CommandRunner") -> dict[str, Any]: + res = runner.run(ctx, "nix profile list --json", allow_failure=False) + raw = extract_stdout_text(res) + return parse_profile_list_json(raw) + + # --------------------------------------------------------------------- + # Legacy index helpers (still useful on older nix; newer nix may reject indices) + # --------------------------------------------------------------------- + + def find_installed_indices_for_output( + self, + ctx: "RepoContext", + runner: "CommandRunner", + output: str, + ) -> List[int]: + data = self.list_json(ctx, runner) + entries = normalize_elements(data) + + hits: List[int] = [] + for e in entries: + if e.index is None: + continue + if entry_matches_output(e, output): + hits.append(e.index) + + return stable_unique_ints(hits) + + def find_indices_by_store_path( + self, + ctx: "RepoContext", + runner: "CommandRunner", + store_path: str, + ) -> List[int]: + needle = (store_path or "").strip() + if not needle: + return [] + + data = self.list_json(ctx, runner) + entries = normalize_elements(data) + + hits: List[int] = [] + for e in entries: + if e.index is None: + continue + if entry_matches_store_path(e, needle): + hits.append(e.index) + + return stable_unique_ints(hits) + + # --------------------------------------------------------------------- + # New token-based helpers (works with newer nix where indices are rejected) + # --------------------------------------------------------------------- + + def find_remove_tokens_for_output( + self, + ctx: "RepoContext", + runner: "CommandRunner", + output: str, + ) -> List[str]: + """ + Returns profile remove tokens to remove entries matching a given output. + + We always include the raw output token first because nix itself suggests: + nix profile remove pkgmgr + """ + out = (output or "").strip() + if not out: + return [] + + data = self.list_json(ctx, runner) + entries = normalize_elements(data) + + tokens: List[str] = [out] # critical: matches nix's own suggestion for conflicts + + for e in entries: + if entry_matches_output(e, out): + # Prefer removing by key/name (non-index) when possible. + # New nix rejects numeric indices; these tokens are safer. + k = (e.key or "").strip() + n = (e.name or "").strip() + + if k and not k.isdigit(): + tokens.append(k) + elif n and not n.isdigit(): + tokens.append(n) + + # stable unique preserving order + seen: set[str] = set() + uniq: List[str] = [] + for t in tokens: + if t and t not in seen: + uniq.append(t) + seen.add(t) + return uniq + + def find_remove_tokens_for_store_prefixes( + self, + ctx: "RepoContext", + runner: "CommandRunner", + prefixes: List[str], + ) -> List[str]: + """ + Returns remove tokens for entries whose store path matches any prefix. + """ + prefixes = [(p or "").strip() for p in (prefixes or []) if p] + prefixes = [p for p in prefixes if p] + if not prefixes: + return [] + + data = self.list_json(ctx, runner) + entries = normalize_elements(data) + + tokens: List[str] = [] + for e in entries: + if not e.store_paths: + continue + if any(sp == p for sp in e.store_paths for p in prefixes): + k = (e.key or "").strip() + n = (e.name or "").strip() + if k and not k.isdigit(): + tokens.append(k) + elif n and not n.isdigit(): + tokens.append(n) + + seen: set[str] = set() + uniq: List[str] = [] + for t in tokens: + if t and t not in seen: + uniq.append(t) + seen.add(t) + return uniq diff --git a/src/pkgmgr/actions/install/installers/nix/profile/matcher.py b/src/pkgmgr/actions/install/installers/nix/profile/matcher.py new file mode 100644 index 0000000..6d747c0 --- /dev/null +++ b/src/pkgmgr/actions/install/installers/nix/profile/matcher.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from typing import List + +from .models import NixProfileEntry + + +def entry_matches_output(entry: NixProfileEntry, output: str) -> bool: + """ + Heuristic matcher: output is typically a flake output name (e.g. "pkgmgr"), + and we match against name/attrPath patterns. + """ + out = (output or "").strip() + if not out: + return False + + candidates = [entry.name, entry.attr_path] + + for c in candidates: + c = (c or "").strip() + if not c: + continue + + # Direct match + if c == out: + return True + + # AttrPath contains "#" + if f"#{out}" in c: + return True + + # AttrPath ends with "." + if c.endswith(f".{out}"): + return True + + # Name pattern "-" (common, e.g. pkgmgr-1) + if c.startswith(f"{out}-"): + return True + + # Historical special case: repo is "package-manager" but output is "pkgmgr" + if out == "pkgmgr" and c.startswith("package-manager-"): + return True + + return False + + +def entry_matches_store_path(entry: NixProfileEntry, store_path: str) -> bool: + needle = (store_path or "").strip() + if not needle: + return False + return any((p or "") == needle for p in entry.store_paths) + + +def stable_unique_ints(values: List[int]) -> List[int]: + seen: set[int] = set() + uniq: List[int] = [] + for v in values: + if v in seen: + continue + uniq.append(v) + seen.add(v) + return uniq diff --git a/src/pkgmgr/actions/install/installers/nix/profile/models.py b/src/pkgmgr/actions/install/installers/nix/profile/models.py new file mode 100644 index 0000000..1a23d62 --- /dev/null +++ b/src/pkgmgr/actions/install/installers/nix/profile/models.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import List, Optional + + +@dataclass(frozen=True) +class NixProfileEntry: + """ + Minimal normalized representation of one nix profile element entry. + """ + + key: str + index: Optional[int] + name: str + attr_path: str + store_paths: List[str] diff --git a/src/pkgmgr/actions/install/installers/nix/profile/normalizer.py b/src/pkgmgr/actions/install/installers/nix/profile/normalizer.py new file mode 100644 index 0000000..12afe9c --- /dev/null +++ b/src/pkgmgr/actions/install/installers/nix/profile/normalizer.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import re +from typing import Any, Dict, Iterable, List, Optional + +from .models import NixProfileEntry + + +def coerce_index(key: str, entry: Dict[str, Any]) -> Optional[int]: + """ + Nix JSON schema varies: + - elements keys might be "0", "1", ... + - or might be names like "pkgmgr-1" + Some versions include an explicit index field. + We try safe options in order. + """ + k = (key or "").strip() + + # 1) Classic: numeric keys + if k.isdigit(): + try: + return int(k) + except Exception: + return None + + # 2) Explicit index fields (schema-dependent) + for field in ("index", "id", "position"): + v = entry.get(field) + if isinstance(v, int): + return v + if isinstance(v, str) and v.strip().isdigit(): + try: + return int(v.strip()) + except Exception: + pass + + # 3) Last resort: extract trailing number from key if it looks like "-" + m = re.match(r"^.+-(\d+)$", k) + if m: + try: + return int(m.group(1)) + except Exception: + return None + + return None + + +def iter_store_paths(entry: Dict[str, Any]) -> Iterable[str]: + """ + Yield all possible store paths from a nix profile JSON entry. + + Nix has had schema shifts. We support common variants: + - "storePaths": ["/nix/store/..", ...] + - "storePaths": "/nix/store/.." (rare) + - "storePath": "/nix/store/.." (some variants) + - nested "outputs" dict(s) with store paths (best-effort) + """ + if not isinstance(entry, dict): + return + + sp = entry.get("storePaths") + if isinstance(sp, list): + for p in sp: + if isinstance(p, str): + yield p + elif isinstance(sp, str): + yield sp + + sp2 = entry.get("storePath") + if isinstance(sp2, str): + yield sp2 + + outs = entry.get("outputs") + if isinstance(outs, dict): + for _, ov in outs.items(): + if isinstance(ov, dict): + p = ov.get("storePath") + if isinstance(p, str): + yield p + + +def normalize_store_path(store_path: str) -> str: + """ + Normalize store path for matching. + Currently just strips whitespace; hook for future normalization if needed. + """ + return (store_path or "").strip() + + +def normalize_elements(data: Dict[str, Any]) -> List[NixProfileEntry]: + """ + Converts nix profile list JSON into a list of normalized entries. + + JSON formats observed: + - {"elements": {"0": {...}, "1": {...}}} + - {"elements": {"pkgmgr-1": {...}, "pkgmgr-2": {...}}} + """ + elements = data.get("elements") + if not isinstance(elements, dict): + return [] + + normalized: List[NixProfileEntry] = [] + + for k, entry in elements.items(): + if not isinstance(entry, dict): + continue + + idx = coerce_index(str(k), entry) + name = str(entry.get("name", "") or "") + attr = str(entry.get("attrPath", "") or "") + + store_paths: List[str] = [] + for p in iter_store_paths(entry): + sp = normalize_store_path(p) + if sp: + store_paths.append(sp) + + normalized.append( + NixProfileEntry( + key=str(k), + index=idx, + name=name, + attr_path=attr, + store_paths=store_paths, + ) + ) + + return normalized diff --git a/src/pkgmgr/actions/install/installers/nix/profile/parser.py b/src/pkgmgr/actions/install/installers/nix/profile/parser.py new file mode 100644 index 0000000..c7e5a89 --- /dev/null +++ b/src/pkgmgr/actions/install/installers/nix/profile/parser.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +import json +from typing import Any, Dict + + +def parse_profile_list_json(raw: str) -> Dict[str, Any]: + """ + Parse JSON output from `nix profile list --json`. + + Raises SystemExit with a helpful excerpt on parse failure. + """ + try: + return json.loads(raw) + except json.JSONDecodeError as e: + excerpt = (raw or "")[:5000] + raise SystemExit( + f"[nix] Failed to parse `nix profile list --json`: {e}\n{excerpt}" + ) from e diff --git a/src/pkgmgr/actions/install/installers/nix/profile/result.py b/src/pkgmgr/actions/install/installers/nix/profile/result.py new file mode 100644 index 0000000..f8a3dc7 --- /dev/null +++ b/src/pkgmgr/actions/install/installers/nix/profile/result.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from typing import Any + + +def extract_stdout_text(result: Any) -> str: + """ + Normalize different runner return types to a stdout string. + + Supported patterns: + - result is str -> returned as-is + - result is bytes/bytearray -> decoded UTF-8 (replace errors) + - result has `.stdout` (str or bytes) -> used + - fallback: str(result) + """ + if isinstance(result, str): + return result + + if isinstance(result, (bytes, bytearray)): + return bytes(result).decode("utf-8", errors="replace") + + stdout = getattr(result, "stdout", None) + if isinstance(stdout, str): + return stdout + if isinstance(stdout, (bytes, bytearray)): + return bytes(stdout).decode("utf-8", errors="replace") + + return str(result) diff --git a/src/pkgmgr/actions/install/installers/nix/profile_list.py b/src/pkgmgr/actions/install/installers/nix/profile_list.py new file mode 100644 index 0000000..7a0835e --- /dev/null +++ b/src/pkgmgr/actions/install/installers/nix/profile_list.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import re +from typing import TYPE_CHECKING, List, Tuple + +from .runner import CommandRunner + +if TYPE_CHECKING: + from pkgmgr.actions.install.context import RepoContext + + +class NixProfileListReader: + def __init__(self, runner: CommandRunner) -> None: + self._runner = runner + + @staticmethod + def _store_prefix(path: str) -> str: + raw = (path or "").strip() + m = re.match(r"^(/nix/store/[0-9a-z]{32}-[^/ \t]+)", raw) + return m.group(1) if m else raw + + def entries(self, ctx: "RepoContext") -> List[Tuple[int, str]]: + res = self._runner.run(ctx, "nix profile list", allow_failure=True) + if res.returncode != 0: + return [] + + entries: List[Tuple[int, str]] = [] + pat = re.compile( + r"^\s*(\d+)\s+.*?(/nix/store/[0-9a-z]{32}-[^/ \t]+)", + re.MULTILINE, + ) + + for m in pat.finditer(res.stdout or ""): + idx_s = m.group(1) + sp = m.group(2) + try: + idx = int(idx_s) + except Exception: + continue + entries.append((idx, self._store_prefix(sp))) + + seen: set[int] = set() + uniq: List[Tuple[int, str]] = [] + for idx, sp in entries: + if idx not in seen: + seen.add(idx) + uniq.append((idx, sp)) + + return uniq + + def indices_matching_store_prefixes(self, ctx: "RepoContext", prefixes: List[str]) -> List[int]: + prefixes = [self._store_prefix(p) for p in prefixes if p] + prefixes = [p for p in prefixes if p] + if not prefixes: + return [] + + hits: List[int] = [] + for idx, sp in self.entries(ctx): + if any(sp == p for p in prefixes): + hits.append(idx) + + seen: set[int] = set() + uniq: List[int] = [] + for i in hits: + if i not in seen: + seen.add(i) + uniq.append(i) + + return uniq diff --git a/src/pkgmgr/actions/install/installers/nix/textparse.py b/src/pkgmgr/actions/install/installers/nix/textparse.py new file mode 100644 index 0000000..862fbd0 --- /dev/null +++ b/src/pkgmgr/actions/install/installers/nix/textparse.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import re +from typing import List + + +class NixConflictTextParser: + @staticmethod + def _store_prefix(path: str) -> str: + raw = (path or "").strip() + m = re.match(r"^(/nix/store/[0-9a-z]{32}-[^/ \t]+)", raw) + return m.group(1) if m else raw + + def remove_tokens(self, text: str) -> List[str]: + pat = re.compile( + r"^\s*nix profile remove\s+([^\s'\"`]+|'[^']+'|\"[^\"]+\")\s*$", + re.MULTILINE, + ) + + tokens: List[str] = [] + for m in pat.finditer(text or ""): + t = (m.group(1) or "").strip() + if (t.startswith("'") and t.endswith("'")) or (t.startswith('"') and t.endswith('"')): + t = t[1:-1] + if t: + tokens.append(t) + + seen: set[str] = set() + uniq: List[str] = [] + for t in tokens: + if t not in seen: + seen.add(t) + uniq.append(t) + + return uniq + + def existing_store_prefixes(self, text: str) -> List[str]: + lines = (text or "").splitlines() + prefixes: List[str] = [] + + in_existing = False + in_new = False + + store_pat = re.compile(r"^\s*(/nix/store/[0-9a-z]{32}-[^ \t]+)") + + for raw in lines: + line = raw.strip() + + if "An existing package already provides the following file" in line: + in_existing = True + in_new = False + continue + + if "This is the conflicting file from the new package" in line: + in_existing = False + in_new = True + continue + + if in_existing: + m = store_pat.match(raw) + if m: + prefixes.append(m.group(1)) + continue + + _ = in_new + + norm = [self._store_prefix(p) for p in prefixes if p] + + seen: set[str] = set() + uniq: List[str] = [] + for p in norm: + if p and p not in seen: + seen.add(p) + uniq.append(p) + + return uniq diff --git a/tests/integration/test_nix_profile_list_json.py b/tests/integration/test_nix_profile_list_json.py new file mode 100644 index 0000000..d29e51a --- /dev/null +++ b/tests/integration/test_nix_profile_list_json.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import json +import unittest +from dataclasses import dataclass + + +@dataclass +class FakeRunResult: + """ + Mimics your runner returning a structured result object. + """ + returncode: int + stdout: str + stderr: str = "" + + +class FakeRunner: + """ + Minimal runner stub: returns exactly what we configure. + """ + def __init__(self, result): + self._result = result + + def run(self, ctx, cmd: str, allow_failure: bool = False): + return self._result + + +class TestE2ENixProfileListJsonParsing(unittest.TestCase): + """ + This test verifies that NixProfileInspector can parse `nix profile list --json` + regardless of whether the CommandRunner returns: + - raw stdout string, OR + - a RunResult-like object with a `.stdout` attribute. + """ + + def test_list_json_accepts_raw_string(self) -> None: + from pkgmgr.actions.install.installers.nix.profile import NixProfileInspector + + payload = {"elements": {"pkgmgr-1": {"attrPath": "packages.x86_64-linux.pkgmgr"}}} + raw = json.dumps(payload) + + runner = FakeRunner(raw) + inspector = NixProfileInspector() + + data = inspector.list_json(ctx=None, runner=runner) + self.assertEqual(data["elements"]["pkgmgr-1"]["attrPath"], "packages.x86_64-linux.pkgmgr") + + def test_list_json_accepts_runresult_object(self) -> None: + from pkgmgr.actions.install.installers.nix.profile import NixProfileInspector + + payload = {"elements": {"pkgmgr-1": {"attrPath": "packages.x86_64-linux.pkgmgr"}}} + raw = json.dumps(payload) + + runner = FakeRunner(FakeRunResult(returncode=0, stdout=raw)) + inspector = NixProfileInspector() + + data = inspector.list_json(ctx=None, runner=runner) + self.assertEqual(data["elements"]["pkgmgr-1"]["attrPath"], "packages.x86_64-linux.pkgmgr") + + +if __name__ == "__main__": + unittest.main()