*fix(nix): resolve nix profile conflicts without numeric indices and fix update pkgmgr system test*
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled

* Switch conflict handling from index-based removal to token-based removal (*nix profile remove <name>*) for newer nix versions
* Add robust parsing of *nix profile list --json* with normalization and heuristics for output/name matching
* Detect at runtime whether numeric profile indices are supported and fall back automatically when they are not
* Ensure *pkgmgr* / *package-manager* flake outputs are correctly identified and cleaned up during reinstall
* Fix failing E2E test *test_update_pkgmgr_shallow_pkgmgr_with_system* by reliably removing conflicting profile entries before reinstall

https://chatgpt.com/share/693efae5-b8bc-800f-94e3-28c93b74ed7b
This commit is contained in:
Kevin Veen-Birkenbach
2025-12-14 18:58:29 +01:00
parent 4883e40812
commit 06a6a77a48
13 changed files with 833 additions and 115 deletions

View File

@@ -0,0 +1,100 @@
from __future__ import annotations
from typing import TYPE_CHECKING, List
from .profile import NixProfileInspector
from .retry import GitHubRateLimitRetry
from .runner import CommandRunner
from .textparse import NixConflictTextParser
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
class NixConflictResolver:
"""
Resolves nix profile file conflicts by:
1. Parsing conflicting store paths from stderr
2. Mapping them to profile remove tokens via `nix profile list --json`
3. Removing those tokens deterministically
4. Retrying install
"""
def __init__(
self,
runner: CommandRunner,
retry: GitHubRateLimitRetry,
profile: NixProfileInspector,
) -> None:
self._runner = runner
self._retry = retry
self._profile = profile
self._parser = NixConflictTextParser()
def resolve(
self,
ctx: "RepoContext",
install_cmd: str,
stdout: str,
stderr: str,
*,
output: str,
max_rounds: int = 10,
) -> bool:
quiet = bool(getattr(ctx, "quiet", False))
combined = f"{stdout}\n{stderr}"
for _ in range(max_rounds):
# 1) Extract conflicting store prefixes from nix error output
store_prefixes = self._parser.existing_store_prefixes(combined)
# 2) Resolve them to concrete remove tokens
tokens: List[str] = self._profile.find_remove_tokens_for_store_prefixes(
ctx,
self._runner,
store_prefixes,
)
# 3) Fallback: output-name based lookup (also covers nix suggesting: `nix profile remove pkgmgr`)
if not tokens:
tokens = self._profile.find_remove_tokens_for_output(ctx, self._runner, output)
if tokens:
if not quiet:
print(
"[nix] conflict detected; removing existing profile entries: "
+ ", ".join(tokens)
)
for t in tokens:
# tokens may contain things like "pkgmgr" or "pkgmgr-1" or quoted tokens (we keep raw)
self._runner.run(ctx, f"nix profile remove {t}", allow_failure=True)
res = self._retry.run_with_retry(ctx, self._runner, install_cmd)
if res.returncode == 0:
return True
combined = f"{res.stdout}\n{res.stderr}"
continue
# 4) Last-resort fallback: use textual remove tokens from stderr (“nix profile remove X”)
tokens = self._parser.remove_tokens(combined)
if tokens:
if not quiet:
print("[nix] fallback remove tokens: " + ", ".join(tokens))
for t in tokens:
self._runner.run(ctx, f"nix profile remove {t}", allow_failure=True)
res = self._retry.run_with_retry(ctx, self._runner, install_cmd)
if res.returncode == 0:
return True
combined = f"{res.stdout}\n{res.stderr}"
continue
if not quiet:
print("[nix] conflict detected but could not resolve profile entries to remove.")
return False
return False

View File

@@ -1,12 +1,12 @@
# src/pkgmgr/actions/install/installers/nix/installer.py
from __future__ import annotations
import os
import shutil
from typing import List, Tuple, TYPE_CHECKING
from typing import TYPE_CHECKING, List, Tuple
from pkgmgr.actions.install.installers.base import BaseInstaller
from .conflicts import NixConflictResolver
from .profile import NixProfileInspector
from .retry import GitHubRateLimitRetry, RetryPolicy
from .runner import CommandRunner
@@ -14,6 +14,7 @@ from .runner import CommandRunner
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
class NixFlakeInstaller(BaseInstaller):
layer = "nix"
FLAKE_FILE = "flake.nix"
@@ -22,15 +23,18 @@ class NixFlakeInstaller(BaseInstaller):
self._runner = CommandRunner()
self._retry = GitHubRateLimitRetry(policy=policy)
self._profile = NixProfileInspector()
self._conflicts = NixConflictResolver(self._runner, self._retry, self._profile)
# ------------------------------------------------------------------ #
# Compatibility: supports()
# ------------------------------------------------------------------ #
# Newer nix rejects numeric indices; we learn this at runtime and cache the decision.
self._indices_supported: bool | None = None
def supports(self, ctx: "RepoContext") -> bool:
if os.environ.get("PKGMGR_DISABLE_NIX_FLAKE_INSTALLER") == "1":
if not ctx.quiet:
print("[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 skipping NixFlakeInstaller.")
print(
"[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 "
"skipping NixFlakeInstaller."
)
return False
if shutil.which("nix") is None:
@@ -38,20 +42,12 @@ class NixFlakeInstaller(BaseInstaller):
return os.path.exists(os.path.join(ctx.repo_dir, self.FLAKE_FILE))
# ------------------------------------------------------------------ #
# Compatibility: output selection
# ------------------------------------------------------------------ #
def _profile_outputs(self, ctx: "RepoContext") -> List[Tuple[str, bool]]:
# (output_name, allow_failure)
if ctx.identifier in {"pkgmgr", "package-manager"}:
return [("pkgmgr", False), ("default", True)]
return [("default", False)]
# ------------------------------------------------------------------ #
# Compatibility: run()
# ------------------------------------------------------------------ #
def run(self, ctx: "RepoContext") -> None:
if not self.supports(ctx):
return
@@ -59,11 +55,12 @@ class NixFlakeInstaller(BaseInstaller):
outputs = self._profile_outputs(ctx)
if not ctx.quiet:
print(
msg = (
"[nix] flake detected in "
f"{ctx.identifier}, ensuring outputs: "
+ ", ".join(name for name, _ in outputs)
)
print(msg)
for output, allow_failure in outputs:
if ctx.force_update:
@@ -71,13 +68,13 @@ class NixFlakeInstaller(BaseInstaller):
else:
self._install_only(ctx, output, allow_failure)
# ------------------------------------------------------------------ #
# Core logic (unchanged semantics)
# ------------------------------------------------------------------ #
def _installable(self, ctx: "RepoContext", output: str) -> str:
return f"{ctx.repo_dir}#{output}"
# ---------------------------------------------------------------------
# Core install path
# ---------------------------------------------------------------------
def _install_only(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
install_cmd = f"nix profile install {self._installable(ctx, output)}"
@@ -85,35 +82,56 @@ class NixFlakeInstaller(BaseInstaller):
print(f"[nix] install: {install_cmd}")
res = self._retry.run_with_retry(ctx, self._runner, install_cmd)
if res.returncode == 0:
if not ctx.quiet:
print(f"[nix] output '{output}' successfully installed.")
return
# Conflict resolver first (handles the common “existing package already provides file” case)
if self._conflicts.resolve(
ctx,
install_cmd,
res.stdout,
res.stderr,
output=output,
):
if not ctx.quiet:
print(f"[nix] output '{output}' successfully installed after conflict cleanup.")
return
if not ctx.quiet:
print(
f"[nix] install failed for '{output}' (exit {res.returncode}), "
"trying index-based upgrade/remove+install..."
"trying upgrade/remove+install..."
)
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
# If indices are supported, try legacy index-upgrade path.
if self._indices_supported is not False:
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
upgraded = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
upgraded = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
if upgraded:
return
if upgraded:
return
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
for idx in indices:
self._remove_index(ctx, idx)
for idx in indices:
self._remove_index(ctx, idx)
# If we learned indices are unsupported, immediately fall back below
if self._indices_supported is False:
self._remove_tokens_for_output(ctx, output)
else:
# indices explicitly unsupported
self._remove_tokens_for_output(ctx, output)
final = self._runner.run(ctx, install_cmd, allow_failure=True)
if final.returncode == 0:
@@ -122,17 +140,24 @@ class NixFlakeInstaller(BaseInstaller):
return
print(f"[ERROR] Failed to install Nix flake output '{output}' (exit {final.returncode})")
if not allow_failure:
raise SystemExit(final.returncode)
print(f"[WARNING] Continuing despite failure of optional output '{output}'.")
# ------------------------------------------------------------------ #
# force_update path (unchanged semantics)
# ------------------------------------------------------------------ #
# ---------------------------------------------------------------------
# force_update path
# ---------------------------------------------------------------------
def _force_upgrade_output(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
# Prefer token path if indices unsupported (new nix)
if self._indices_supported is False:
self._remove_tokens_for_output(ctx, output)
self._install_only(ctx, output, allow_failure)
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded.")
return
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
upgraded_any = False
@@ -143,7 +168,8 @@ class NixFlakeInstaller(BaseInstaller):
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
if upgraded_any:
print(f"[nix] output '{output}' successfully upgraded.")
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded.")
return
if indices and not ctx.quiet:
@@ -152,17 +178,52 @@ class NixFlakeInstaller(BaseInstaller):
for idx in indices:
self._remove_index(ctx, idx)
# If we learned indices are unsupported, also remove by token to actually clear conflicts
if self._indices_supported is False:
self._remove_tokens_for_output(ctx, output)
self._install_only(ctx, output, allow_failure)
print(f"[nix] output '{output}' successfully upgraded.")
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded.")
# ------------------------------------------------------------------ #
# ---------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------ #
# ---------------------------------------------------------------------
def _stderr_says_indices_unsupported(self, stderr: str) -> bool:
s = (stderr or "").lower()
return "no longer supports indices" in s or "does not support indices" in s
def _upgrade_index(self, ctx: "RepoContext", idx: int) -> bool:
res = self._runner.run(ctx, f"nix profile upgrade --refresh {idx}", allow_failure=True)
cmd = f"nix profile upgrade --refresh {idx}"
res = self._runner.run(ctx, cmd, allow_failure=True)
if self._stderr_says_indices_unsupported(getattr(res, "stderr", "")):
self._indices_supported = False
return False
if self._indices_supported is None:
self._indices_supported = True
return res.returncode == 0
def _remove_index(self, ctx: "RepoContext", idx: int) -> None:
self._runner.run(ctx, f"nix profile remove {idx}", allow_failure=True)
res = self._runner.run(ctx, f"nix profile remove {idx}", allow_failure=True)
if self._stderr_says_indices_unsupported(getattr(res, "stderr", "")):
self._indices_supported = False
if self._indices_supported is None:
self._indices_supported = True
def _remove_tokens_for_output(self, ctx: "RepoContext", output: str) -> None:
tokens = self._profile.find_remove_tokens_for_output(ctx, self._runner, output)
if not tokens:
return
if not ctx.quiet:
print(f"[nix] indices unsupported; removing by token(s): {', '.join(tokens)}")
for t in tokens:
self._runner.run(ctx, f"nix profile remove {t}", allow_failure=True)

View File

@@ -1,71 +0,0 @@
from __future__ import annotations
import json
from typing import Any, List, TYPE_CHECKING
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
from .runner import CommandRunner
class NixProfileInspector:
"""
Reads and interprets `nix profile list --json` and provides helpers for
finding indices matching a given output name.
"""
def find_installed_indices_for_output(self, ctx: "RepoContext", runner: "CommandRunner", output: str) -> List[int]:
res = runner.run(ctx, "nix profile list --json", allow_failure=True)
if res.returncode != 0:
return []
try:
data = json.loads(res.stdout or "{}")
except json.JSONDecodeError:
return []
indices: List[int] = []
elements = data.get("elements")
if isinstance(elements, dict):
for idx_str, elem in elements.items():
try:
idx = int(idx_str)
except (TypeError, ValueError):
continue
if self._element_matches_output(elem, output):
indices.append(idx)
return sorted(indices)
if isinstance(elements, list):
for elem in elements:
idx = elem.get("index") if isinstance(elem, dict) else None
if isinstance(idx, int) and self._element_matches_output(elem, output):
indices.append(idx)
return sorted(indices)
return []
@staticmethod
def element_matches_output(elem: Any, output: str) -> bool:
return NixProfileInspector._element_matches_output(elem, output)
@staticmethod
def _element_matches_output(elem: Any, output: str) -> bool:
out = (output or "").strip()
if not out or not isinstance(elem, dict):
return False
candidates: List[str] = []
for k in ("attrPath", "originalUrl", "url", "storePath", "name"):
v = elem.get(k)
if isinstance(v, str) and v:
candidates.append(v)
for c in candidates:
if c == out:
return True
if f"#{out}" in c:
return True
return False

View File

@@ -0,0 +1,4 @@
from .inspector import NixProfileInspector
from .models import NixProfileEntry
__all__ = ["NixProfileInspector", "NixProfileEntry"]

View File

@@ -0,0 +1,162 @@
from __future__ import annotations
from typing import Any, List, TYPE_CHECKING
from .matcher import (
entry_matches_output,
entry_matches_store_path,
stable_unique_ints,
)
from .normalizer import normalize_elements
from .parser import parse_profile_list_json
from .result import extract_stdout_text
if TYPE_CHECKING:
# Keep these as TYPE_CHECKING-only to avoid runtime import cycles.
from pkgmgr.actions.install.context import RepoContext
from pkgmgr.core.command.runner import CommandRunner
class NixProfileInspector:
"""
Reads and inspects the user's Nix profile list (JSON).
Public API:
- list_json()
- find_installed_indices_for_output() (legacy; may not work on newer nix)
- find_indices_by_store_path() (legacy; may not work on newer nix)
- find_remove_tokens_for_output()
- find_remove_tokens_for_store_prefixes()
"""
def list_json(self, ctx: "RepoContext", runner: "CommandRunner") -> dict[str, Any]:
res = runner.run(ctx, "nix profile list --json", allow_failure=False)
raw = extract_stdout_text(res)
return parse_profile_list_json(raw)
# ---------------------------------------------------------------------
# Legacy index helpers (still useful on older nix; newer nix may reject indices)
# ---------------------------------------------------------------------
def find_installed_indices_for_output(
self,
ctx: "RepoContext",
runner: "CommandRunner",
output: str,
) -> List[int]:
data = self.list_json(ctx, runner)
entries = normalize_elements(data)
hits: List[int] = []
for e in entries:
if e.index is None:
continue
if entry_matches_output(e, output):
hits.append(e.index)
return stable_unique_ints(hits)
def find_indices_by_store_path(
self,
ctx: "RepoContext",
runner: "CommandRunner",
store_path: str,
) -> List[int]:
needle = (store_path or "").strip()
if not needle:
return []
data = self.list_json(ctx, runner)
entries = normalize_elements(data)
hits: List[int] = []
for e in entries:
if e.index is None:
continue
if entry_matches_store_path(e, needle):
hits.append(e.index)
return stable_unique_ints(hits)
# ---------------------------------------------------------------------
# New token-based helpers (works with newer nix where indices are rejected)
# ---------------------------------------------------------------------
def find_remove_tokens_for_output(
self,
ctx: "RepoContext",
runner: "CommandRunner",
output: str,
) -> List[str]:
"""
Returns profile remove tokens to remove entries matching a given output.
We always include the raw output token first because nix itself suggests:
nix profile remove pkgmgr
"""
out = (output or "").strip()
if not out:
return []
data = self.list_json(ctx, runner)
entries = normalize_elements(data)
tokens: List[str] = [out] # critical: matches nix's own suggestion for conflicts
for e in entries:
if entry_matches_output(e, out):
# Prefer removing by key/name (non-index) when possible.
# New nix rejects numeric indices; these tokens are safer.
k = (e.key or "").strip()
n = (e.name or "").strip()
if k and not k.isdigit():
tokens.append(k)
elif n and not n.isdigit():
tokens.append(n)
# stable unique preserving order
seen: set[str] = set()
uniq: List[str] = []
for t in tokens:
if t and t not in seen:
uniq.append(t)
seen.add(t)
return uniq
def find_remove_tokens_for_store_prefixes(
self,
ctx: "RepoContext",
runner: "CommandRunner",
prefixes: List[str],
) -> List[str]:
"""
Returns remove tokens for entries whose store path matches any prefix.
"""
prefixes = [(p or "").strip() for p in (prefixes or []) if p]
prefixes = [p for p in prefixes if p]
if not prefixes:
return []
data = self.list_json(ctx, runner)
entries = normalize_elements(data)
tokens: List[str] = []
for e in entries:
if not e.store_paths:
continue
if any(sp == p for sp in e.store_paths for p in prefixes):
k = (e.key or "").strip()
n = (e.name or "").strip()
if k and not k.isdigit():
tokens.append(k)
elif n and not n.isdigit():
tokens.append(n)
seen: set[str] = set()
uniq: List[str] = []
for t in tokens:
if t and t not in seen:
uniq.append(t)
seen.add(t)
return uniq

View File

@@ -0,0 +1,62 @@
from __future__ import annotations
from typing import List
from .models import NixProfileEntry
def entry_matches_output(entry: NixProfileEntry, output: str) -> bool:
"""
Heuristic matcher: output is typically a flake output name (e.g. "pkgmgr"),
and we match against name/attrPath patterns.
"""
out = (output or "").strip()
if not out:
return False
candidates = [entry.name, entry.attr_path]
for c in candidates:
c = (c or "").strip()
if not c:
continue
# Direct match
if c == out:
return True
# AttrPath contains "#<output>"
if f"#{out}" in c:
return True
# AttrPath ends with ".<output>"
if c.endswith(f".{out}"):
return True
# Name pattern "<output>-<n>" (common, e.g. pkgmgr-1)
if c.startswith(f"{out}-"):
return True
# Historical special case: repo is "package-manager" but output is "pkgmgr"
if out == "pkgmgr" and c.startswith("package-manager-"):
return True
return False
def entry_matches_store_path(entry: NixProfileEntry, store_path: str) -> bool:
needle = (store_path or "").strip()
if not needle:
return False
return any((p or "") == needle for p in entry.store_paths)
def stable_unique_ints(values: List[int]) -> List[int]:
seen: set[int] = set()
uniq: List[int] = []
for v in values:
if v in seen:
continue
uniq.append(v)
seen.add(v)
return uniq

View File

@@ -0,0 +1,17 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional
@dataclass(frozen=True)
class NixProfileEntry:
"""
Minimal normalized representation of one nix profile element entry.
"""
key: str
index: Optional[int]
name: str
attr_path: str
store_paths: List[str]

View File

@@ -0,0 +1,128 @@
from __future__ import annotations
import re
from typing import Any, Dict, Iterable, List, Optional
from .models import NixProfileEntry
def coerce_index(key: str, entry: Dict[str, Any]) -> Optional[int]:
"""
Nix JSON schema varies:
- elements keys might be "0", "1", ...
- or might be names like "pkgmgr-1"
Some versions include an explicit index field.
We try safe options in order.
"""
k = (key or "").strip()
# 1) Classic: numeric keys
if k.isdigit():
try:
return int(k)
except Exception:
return None
# 2) Explicit index fields (schema-dependent)
for field in ("index", "id", "position"):
v = entry.get(field)
if isinstance(v, int):
return v
if isinstance(v, str) and v.strip().isdigit():
try:
return int(v.strip())
except Exception:
pass
# 3) Last resort: extract trailing number from key if it looks like "<name>-<n>"
m = re.match(r"^.+-(\d+)$", k)
if m:
try:
return int(m.group(1))
except Exception:
return None
return None
def iter_store_paths(entry: Dict[str, Any]) -> Iterable[str]:
"""
Yield all possible store paths from a nix profile JSON entry.
Nix has had schema shifts. We support common variants:
- "storePaths": ["/nix/store/..", ...]
- "storePaths": "/nix/store/.." (rare)
- "storePath": "/nix/store/.." (some variants)
- nested "outputs" dict(s) with store paths (best-effort)
"""
if not isinstance(entry, dict):
return
sp = entry.get("storePaths")
if isinstance(sp, list):
for p in sp:
if isinstance(p, str):
yield p
elif isinstance(sp, str):
yield sp
sp2 = entry.get("storePath")
if isinstance(sp2, str):
yield sp2
outs = entry.get("outputs")
if isinstance(outs, dict):
for _, ov in outs.items():
if isinstance(ov, dict):
p = ov.get("storePath")
if isinstance(p, str):
yield p
def normalize_store_path(store_path: str) -> str:
"""
Normalize store path for matching.
Currently just strips whitespace; hook for future normalization if needed.
"""
return (store_path or "").strip()
def normalize_elements(data: Dict[str, Any]) -> List[NixProfileEntry]:
"""
Converts nix profile list JSON into a list of normalized entries.
JSON formats observed:
- {"elements": {"0": {...}, "1": {...}}}
- {"elements": {"pkgmgr-1": {...}, "pkgmgr-2": {...}}}
"""
elements = data.get("elements")
if not isinstance(elements, dict):
return []
normalized: List[NixProfileEntry] = []
for k, entry in elements.items():
if not isinstance(entry, dict):
continue
idx = coerce_index(str(k), entry)
name = str(entry.get("name", "") or "")
attr = str(entry.get("attrPath", "") or "")
store_paths: List[str] = []
for p in iter_store_paths(entry):
sp = normalize_store_path(p)
if sp:
store_paths.append(sp)
normalized.append(
NixProfileEntry(
key=str(k),
index=idx,
name=name,
attr_path=attr,
store_paths=store_paths,
)
)
return normalized

View File

@@ -0,0 +1,19 @@
from __future__ import annotations
import json
from typing import Any, Dict
def parse_profile_list_json(raw: str) -> Dict[str, Any]:
"""
Parse JSON output from `nix profile list --json`.
Raises SystemExit with a helpful excerpt on parse failure.
"""
try:
return json.loads(raw)
except json.JSONDecodeError as e:
excerpt = (raw or "")[:5000]
raise SystemExit(
f"[nix] Failed to parse `nix profile list --json`: {e}\n{excerpt}"
) from e

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from typing import Any
def extract_stdout_text(result: Any) -> str:
"""
Normalize different runner return types to a stdout string.
Supported patterns:
- result is str -> returned as-is
- result is bytes/bytearray -> decoded UTF-8 (replace errors)
- result has `.stdout` (str or bytes) -> used
- fallback: str(result)
"""
if isinstance(result, str):
return result
if isinstance(result, (bytes, bytearray)):
return bytes(result).decode("utf-8", errors="replace")
stdout = getattr(result, "stdout", None)
if isinstance(stdout, str):
return stdout
if isinstance(stdout, (bytes, bytearray)):
return bytes(stdout).decode("utf-8", errors="replace")
return str(result)

View File

@@ -0,0 +1,69 @@
from __future__ import annotations
import re
from typing import TYPE_CHECKING, List, Tuple
from .runner import CommandRunner
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
class NixProfileListReader:
def __init__(self, runner: CommandRunner) -> None:
self._runner = runner
@staticmethod
def _store_prefix(path: str) -> str:
raw = (path or "").strip()
m = re.match(r"^(/nix/store/[0-9a-z]{32}-[^/ \t]+)", raw)
return m.group(1) if m else raw
def entries(self, ctx: "RepoContext") -> List[Tuple[int, str]]:
res = self._runner.run(ctx, "nix profile list", allow_failure=True)
if res.returncode != 0:
return []
entries: List[Tuple[int, str]] = []
pat = re.compile(
r"^\s*(\d+)\s+.*?(/nix/store/[0-9a-z]{32}-[^/ \t]+)",
re.MULTILINE,
)
for m in pat.finditer(res.stdout or ""):
idx_s = m.group(1)
sp = m.group(2)
try:
idx = int(idx_s)
except Exception:
continue
entries.append((idx, self._store_prefix(sp)))
seen: set[int] = set()
uniq: List[Tuple[int, str]] = []
for idx, sp in entries:
if idx not in seen:
seen.add(idx)
uniq.append((idx, sp))
return uniq
def indices_matching_store_prefixes(self, ctx: "RepoContext", prefixes: List[str]) -> List[int]:
prefixes = [self._store_prefix(p) for p in prefixes if p]
prefixes = [p for p in prefixes if p]
if not prefixes:
return []
hits: List[int] = []
for idx, sp in self.entries(ctx):
if any(sp == p for p in prefixes):
hits.append(idx)
seen: set[int] = set()
uniq: List[int] = []
for i in hits:
if i not in seen:
seen.add(i)
uniq.append(i)
return uniq

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
import re
from typing import List
class NixConflictTextParser:
@staticmethod
def _store_prefix(path: str) -> str:
raw = (path or "").strip()
m = re.match(r"^(/nix/store/[0-9a-z]{32}-[^/ \t]+)", raw)
return m.group(1) if m else raw
def remove_tokens(self, text: str) -> List[str]:
pat = re.compile(
r"^\s*nix profile remove\s+([^\s'\"`]+|'[^']+'|\"[^\"]+\")\s*$",
re.MULTILINE,
)
tokens: List[str] = []
for m in pat.finditer(text or ""):
t = (m.group(1) or "").strip()
if (t.startswith("'") and t.endswith("'")) or (t.startswith('"') and t.endswith('"')):
t = t[1:-1]
if t:
tokens.append(t)
seen: set[str] = set()
uniq: List[str] = []
for t in tokens:
if t not in seen:
seen.add(t)
uniq.append(t)
return uniq
def existing_store_prefixes(self, text: str) -> List[str]:
lines = (text or "").splitlines()
prefixes: List[str] = []
in_existing = False
in_new = False
store_pat = re.compile(r"^\s*(/nix/store/[0-9a-z]{32}-[^ \t]+)")
for raw in lines:
line = raw.strip()
if "An existing package already provides the following file" in line:
in_existing = True
in_new = False
continue
if "This is the conflicting file from the new package" in line:
in_existing = False
in_new = True
continue
if in_existing:
m = store_pat.match(raw)
if m:
prefixes.append(m.group(1))
continue
_ = in_new
norm = [self._store_prefix(p) for p in prefixes if p]
seen: set[str] = set()
uniq: List[str] = []
for p in norm:
if p and p not in seen:
seen.add(p)
uniq.append(p)
return uniq

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
import json
import unittest
from dataclasses import dataclass
@dataclass
class FakeRunResult:
"""
Mimics your runner returning a structured result object.
"""
returncode: int
stdout: str
stderr: str = ""
class FakeRunner:
"""
Minimal runner stub: returns exactly what we configure.
"""
def __init__(self, result):
self._result = result
def run(self, ctx, cmd: str, allow_failure: bool = False):
return self._result
class TestE2ENixProfileListJsonParsing(unittest.TestCase):
"""
This test verifies that NixProfileInspector can parse `nix profile list --json`
regardless of whether the CommandRunner returns:
- raw stdout string, OR
- a RunResult-like object with a `.stdout` attribute.
"""
def test_list_json_accepts_raw_string(self) -> None:
from pkgmgr.actions.install.installers.nix.profile import NixProfileInspector
payload = {"elements": {"pkgmgr-1": {"attrPath": "packages.x86_64-linux.pkgmgr"}}}
raw = json.dumps(payload)
runner = FakeRunner(raw)
inspector = NixProfileInspector()
data = inspector.list_json(ctx=None, runner=runner)
self.assertEqual(data["elements"]["pkgmgr-1"]["attrPath"], "packages.x86_64-linux.pkgmgr")
def test_list_json_accepts_runresult_object(self) -> None:
from pkgmgr.actions.install.installers.nix.profile import NixProfileInspector
payload = {"elements": {"pkgmgr-1": {"attrPath": "packages.x86_64-linux.pkgmgr"}}}
raw = json.dumps(payload)
runner = FakeRunner(FakeRunResult(returncode=0, stdout=raw))
inspector = NixProfileInspector()
data = inspector.list_json(ctx=None, runner=runner)
self.assertEqual(data["elements"]["pkgmgr-1"]["attrPath"], "packages.x86_64-linux.pkgmgr")
if __name__ == "__main__":
unittest.main()