refactor(nix): split NixFlakeInstaller into atomic modules and add GitHub 403 retry handling
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled

- Move Nix flake installer into installers/nix/ with atomic components
  (installer, runner, profile, retry, types)
- Preserve legacy behavior and semantics of NixFlakeInstaller
- Add GitHub API 403 rate-limit retry with Fibonacci backoff + jitter
- Update all imports to new nix module path
- Rename legacy unit tests and adapt patches to new structure
- Add unit test for simulated GitHub 403 retry without realtime sleeping

https://chatgpt.com/share/693e925d-a79c-800f-b0b6-92b8ba260b11
This commit is contained in:
Kevin Veen-Birkenbach
2025-12-14 11:32:48 +01:00
parent 3990560cd7
commit d55c8d3726
13 changed files with 524 additions and 263 deletions

View File

@@ -28,7 +28,7 @@ from pkgmgr.actions.install.installers.os_packages import (
DebianControlInstaller,
RpmSpecInstaller,
)
from pkgmgr.actions.install.installers.nix_flake import (
from pkgmgr.actions.install.installers.nix import (
NixFlakeInstaller,
)
from pkgmgr.actions.install.installers.python import PythonInstaller

View File

@@ -9,7 +9,7 @@ pkgmgr.actions.install.installers.
"""
from pkgmgr.actions.install.installers.base import BaseInstaller # noqa: F401
from pkgmgr.actions.install.installers.nix_flake import NixFlakeInstaller # noqa: F401
from pkgmgr.actions.install.installers.nix import NixFlakeInstaller # noqa: F401
from pkgmgr.actions.install.installers.python import PythonInstaller # noqa: F401
from pkgmgr.actions.install.installers.makefile import MakefileInstaller # noqa: F401

View File

@@ -0,0 +1,4 @@
from .installer import NixFlakeInstaller
from .retry import RetryPolicy
__all__ = ["NixFlakeInstaller", "RetryPolicy"]

View File

@@ -0,0 +1,166 @@
# src/pkgmgr/actions/install/installers/nix/installer.py
from __future__ import annotations
import os
import shutil
from typing import List, Tuple
from pkgmgr.actions.install.installers.base import BaseInstaller
from .profile import NixProfileInspector
from .retry import GitHubRateLimitRetry, RetryPolicy
from .runner import CommandRunner
class NixFlakeInstaller(BaseInstaller):
layer = "nix"
FLAKE_FILE = "flake.nix"
def __init__(self, policy: RetryPolicy | None = None) -> None:
self._runner = CommandRunner()
self._retry = GitHubRateLimitRetry(policy=policy)
self._profile = NixProfileInspector()
# ------------------------------------------------------------------ #
# Compatibility: supports()
# ------------------------------------------------------------------ #
def supports(self, ctx: "RepoContext") -> bool:
if os.environ.get("PKGMGR_DISABLE_NIX_FLAKE_INSTALLER") == "1":
if not ctx.quiet:
print("[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 skipping NixFlakeInstaller.")
return False
if shutil.which("nix") is None:
return False
return os.path.exists(os.path.join(ctx.repo_dir, self.FLAKE_FILE))
# ------------------------------------------------------------------ #
# Compatibility: output selection
# ------------------------------------------------------------------ #
def _profile_outputs(self, ctx: "RepoContext") -> List[Tuple[str, bool]]:
# (output_name, allow_failure)
if ctx.identifier in {"pkgmgr", "package-manager"}:
return [("pkgmgr", False), ("default", True)]
return [("default", False)]
# ------------------------------------------------------------------ #
# Compatibility: run()
# ------------------------------------------------------------------ #
def run(self, ctx: "RepoContext") -> None:
if not self.supports(ctx):
return
outputs = self._profile_outputs(ctx)
if not ctx.quiet:
print(
"[nix] flake detected in "
f"{ctx.identifier}, ensuring outputs: "
+ ", ".join(name for name, _ in outputs)
)
for output, allow_failure in outputs:
if ctx.force_update:
self._force_upgrade_output(ctx, output, allow_failure)
else:
self._install_only(ctx, output, allow_failure)
# ------------------------------------------------------------------ #
# Core logic (unchanged semantics)
# ------------------------------------------------------------------ #
def _installable(self, ctx: "RepoContext", output: str) -> str:
return f"{ctx.repo_dir}#{output}"
def _install_only(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
install_cmd = f"nix profile install {self._installable(ctx, output)}"
if not ctx.quiet:
print(f"[nix] install: {install_cmd}")
res = self._retry.run_with_retry(ctx, self._runner, install_cmd)
if res.returncode == 0:
if not ctx.quiet:
print(f"[nix] output '{output}' successfully installed.")
return
if not ctx.quiet:
print(
f"[nix] install failed for '{output}' (exit {res.returncode}), "
"trying index-based upgrade/remove+install..."
)
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
upgraded = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
if upgraded:
return
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
for idx in indices:
self._remove_index(ctx, idx)
final = self._runner.run(ctx, install_cmd, allow_failure=True)
if final.returncode == 0:
if not ctx.quiet:
print(f"[nix] output '{output}' successfully re-installed.")
return
print(f"[ERROR] Failed to install Nix flake output '{output}' (exit {final.returncode})")
if not allow_failure:
raise SystemExit(final.returncode)
print(f"[WARNING] Continuing despite failure of optional output '{output}'.")
# ------------------------------------------------------------------ #
# force_update path (unchanged semantics)
# ------------------------------------------------------------------ #
def _force_upgrade_output(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
upgraded_any = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded_any = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
if upgraded_any:
print(f"[nix] output '{output}' successfully upgraded.")
return
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
for idx in indices:
self._remove_index(ctx, idx)
self._install_only(ctx, output, allow_failure)
print(f"[nix] output '{output}' successfully upgraded.")
# ------------------------------------------------------------------ #
# Helpers
# ------------------------------------------------------------------ #
def _upgrade_index(self, ctx: "RepoContext", idx: int) -> bool:
res = self._runner.run(ctx, f"nix profile upgrade --refresh {idx}", allow_failure=True)
return res.returncode == 0
def _remove_index(self, ctx: "RepoContext", idx: int) -> None:
self._runner.run(ctx, f"nix profile remove {idx}", allow_failure=True)

View File

@@ -0,0 +1,67 @@
# src/pkgmgr/actions/install/installers/nix/profile.py
from __future__ import annotations
import json
from typing import Any, List
class NixProfileInspector:
"""
Reads and interprets `nix profile list --json` and provides helpers for
finding indices matching a given output name.
"""
def find_installed_indices_for_output(self, ctx: "RepoContext", runner: "CommandRunner", output: str) -> List[int]:
res = runner.run(ctx, "nix profile list --json", allow_failure=True)
if res.returncode != 0:
return []
try:
data = json.loads(res.stdout or "{}")
except json.JSONDecodeError:
return []
indices: List[int] = []
elements = data.get("elements")
if isinstance(elements, dict):
for idx_str, elem in elements.items():
try:
idx = int(idx_str)
except (TypeError, ValueError):
continue
if self._element_matches_output(elem, output):
indices.append(idx)
return sorted(indices)
if isinstance(elements, list):
for elem in elements:
idx = elem.get("index") if isinstance(elem, dict) else None
if isinstance(idx, int) and self._element_matches_output(elem, output):
indices.append(idx)
return sorted(indices)
return []
@staticmethod
def element_matches_output(elem: Any, output: str) -> bool:
return NixProfileInspector._element_matches_output(elem, output)
@staticmethod
def _element_matches_output(elem: Any, output: str) -> bool:
out = (output or "").strip()
if not out or not isinstance(elem, dict):
return False
candidates: List[str] = []
for k in ("attrPath", "originalUrl", "url", "storePath", "name"):
v = elem.get(k)
if isinstance(v, str) and v:
candidates.append(v)
for c in candidates:
if c == out:
return True
if f"#{out}" in c:
return True
return False

View File

@@ -0,0 +1,85 @@
# src/pkgmgr/actions/install/installers/nix/retry.py
from __future__ import annotations
import random
import time
from dataclasses import dataclass
from typing import Iterable
from .types import RunResult
@dataclass(frozen=True)
class RetryPolicy:
max_attempts: int = 7
base_delay_seconds: int = 30
jitter_seconds_min: int = 0
jitter_seconds_max: int = 60
class GitHubRateLimitRetry:
"""
Retries nix install commands only when the error looks like a GitHub API rate limit (HTTP 403).
Backoff: Fibonacci(base, base, ...) + random jitter.
"""
def __init__(self, policy: RetryPolicy | None = None) -> None:
self._policy = policy or RetryPolicy()
def run_with_retry(
self,
ctx: "RepoContext",
runner: "CommandRunner",
install_cmd: str,
) -> RunResult:
quiet = bool(getattr(ctx, "quiet", False))
delays = list(self._fibonacci_backoff(self._policy.base_delay_seconds, self._policy.max_attempts))
last: RunResult | None = None
for attempt, base_delay in enumerate(delays, start=1):
if not quiet:
print(f"[nix] attempt {attempt}/{self._policy.max_attempts}: {install_cmd}")
res = runner.run(ctx, install_cmd, allow_failure=True)
last = res
if res.returncode == 0:
return res
combined = f"{res.stdout}\n{res.stderr}"
if not self._is_github_rate_limit_error(combined):
return res
if attempt >= self._policy.max_attempts:
break
jitter = random.randint(self._policy.jitter_seconds_min, self._policy.jitter_seconds_max)
wait_time = base_delay + jitter
if not quiet:
print(
"[nix] GitHub rate limit detected (403). "
f"Retrying in {wait_time}s (base={base_delay}s, jitter={jitter}s)..."
)
time.sleep(wait_time)
return last if last is not None else RunResult(returncode=1, stdout="", stderr="nix install retry failed")
@staticmethod
def _is_github_rate_limit_error(text: str) -> bool:
t = (text or "").lower()
return (
"http error 403" in t
or "rate limit exceeded" in t
or "github api rate limit" in t
or "api rate limit exceeded" in t
)
@staticmethod
def _fibonacci_backoff(base: int, attempts: int) -> Iterable[int]:
a, b = base, base
for _ in range(max(1, attempts)):
yield a
a, b = b, a + b

View File

@@ -0,0 +1,60 @@
from __future__ import annotations
import subprocess
from .types import RunResult
class CommandRunner:
"""
Executes commands (shell=True) inside a repository directory (if provided).
Supports preview mode and compact failure output logging.
"""
def run(self, ctx: "RepoContext", cmd: str, allow_failure: bool) -> RunResult:
repo_dir = getattr(ctx, "repo_dir", None) or getattr(ctx, "repo_path", None)
preview = bool(getattr(ctx, "preview", False))
quiet = bool(getattr(ctx, "quiet", False))
if preview:
if not quiet:
print(f"[preview] {cmd}")
return RunResult(returncode=0, stdout="", stderr="")
try:
p = subprocess.run(
cmd,
shell=True,
cwd=repo_dir,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
except Exception as e:
if not allow_failure:
raise
return RunResult(returncode=1, stdout="", stderr=str(e))
res = RunResult(returncode=p.returncode, stdout=p.stdout or "", stderr=p.stderr or "")
if res.returncode != 0 and not quiet:
self._print_compact_failure(res)
if res.returncode != 0 and not allow_failure:
raise SystemExit(res.returncode)
return res
@staticmethod
def _print_compact_failure(res: RunResult) -> None:
out = (res.stdout or "").strip()
err = (res.stderr or "").strip()
if out:
print("[nix] stdout (last lines):")
print("\n".join(out.splitlines()[-20:]))
if err:
print("[nix] stderr (last lines):")
print("\n".join(err.splitlines()[-40:]))

View File

@@ -0,0 +1,10 @@
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class RunResult:
returncode: int
stdout: str
stderr: str

View File

@@ -1,238 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
import os
import shutil
import subprocess
from typing import TYPE_CHECKING, List, Tuple
from pkgmgr.actions.install.installers.base import BaseInstaller
from pkgmgr.core.command.run import run_command
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
class NixFlakeInstaller(BaseInstaller):
layer = "nix"
FLAKE_FILE = "flake.nix"
def supports(self, ctx: "RepoContext") -> bool:
if os.environ.get("PKGMGR_DISABLE_NIX_FLAKE_INSTALLER") == "1":
if not ctx.quiet:
print("[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 skipping NixFlakeInstaller.")
return False
if shutil.which("nix") is None:
return False
return os.path.exists(os.path.join(ctx.repo_dir, self.FLAKE_FILE))
def _profile_outputs(self, ctx: "RepoContext") -> List[Tuple[str, bool]]:
# (output_name, allow_failure)
if ctx.identifier in {"pkgmgr", "package-manager"}:
return [("pkgmgr", False), ("default", True)]
return [("default", False)]
def _installable(self, ctx: "RepoContext", output: str) -> str:
return f"{ctx.repo_dir}#{output}"
def _run(self, ctx: "RepoContext", cmd: str, allow_failure: bool = True):
return run_command(
cmd,
cwd=ctx.repo_dir,
preview=ctx.preview,
allow_failure=allow_failure,
)
def _profile_list_json(self, ctx: "RepoContext") -> dict:
"""
Read current Nix profile entries as JSON (best-effort).
NOTE: Nix versions differ:
- Newer: {"elements": [ { "index": 0, "attrPath": "...", ... }, ... ]}
- Older: {"elements": [ "nixpkgs#hello", ... ]} (strings)
We return {} on failure or in preview mode.
"""
if ctx.preview:
return {}
proc = subprocess.run(
["nix", "profile", "list", "--json"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=os.environ.copy(),
)
if proc.returncode != 0:
return {}
try:
return json.loads(proc.stdout or "{}")
except json.JSONDecodeError:
return {}
def _find_installed_indices_for_output(self, ctx: "RepoContext", output: str) -> List[int]:
"""
Find installed profile indices for a given output.
Works across Nix JSON variants:
- If elements are dicts: we can extract indices.
- If elements are strings: we cannot extract indices -> return [].
"""
data = self._profile_list_json(ctx)
elements = data.get("elements", []) or []
matches: List[int] = []
for el in elements:
# Legacy JSON format: plain strings -> no index information
if not isinstance(el, dict):
continue
idx = el.get("index")
if idx is None:
continue
attr_path = el.get("attrPath") or el.get("attr_path") or ""
pname = el.get("pname") or ""
name = el.get("name") or ""
if attr_path == output:
matches.append(int(idx))
continue
if pname == output or name == output:
matches.append(int(idx))
continue
if isinstance(attr_path, str) and attr_path.endswith(f".{output}"):
matches.append(int(idx))
continue
return matches
def _upgrade_index(self, ctx: "RepoContext", index: int) -> bool:
cmd = f"nix profile upgrade --refresh {index}"
if not ctx.quiet:
print(f"[nix] upgrade: {cmd}")
res = self._run(ctx, cmd, allow_failure=True)
return res.returncode == 0
def _remove_index(self, ctx: "RepoContext", index: int) -> None:
cmd = f"nix profile remove {index}"
if not ctx.quiet:
print(f"[nix] remove: {cmd}")
self._run(ctx, cmd, allow_failure=True)
def _install_only(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
"""
Install output; on failure, try index-based upgrade/remove+install if possible.
"""
installable = self._installable(ctx, output)
install_cmd = f"nix profile install {installable}"
if not ctx.quiet:
print(f"[nix] install: {install_cmd}")
res = self._run(ctx, install_cmd, allow_failure=True)
if res.returncode == 0:
if not ctx.quiet:
print(f"[nix] output '{output}' successfully installed.")
return
if not ctx.quiet:
print(
f"[nix] install failed for '{output}' (exit {res.returncode}), "
"trying index-based upgrade/remove+install..."
)
indices = self._find_installed_indices_for_output(ctx, output)
# 1) Try upgrading existing indices (only possible on newer JSON format)
upgraded = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
if upgraded:
return
# 2) Remove matching indices and retry install
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
for idx in indices:
self._remove_index(ctx, idx)
final = self._run(ctx, install_cmd, allow_failure=True)
if final.returncode == 0:
if not ctx.quiet:
print(f"[nix] output '{output}' successfully re-installed.")
return
msg = f"[ERROR] Failed to install Nix flake output '{output}' (exit {final.returncode})"
print(msg)
if not allow_failure:
raise SystemExit(final.returncode)
print(f"[WARNING] Continuing despite failure of optional output '{output}'.")
def _force_upgrade_output(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
"""
force_update path:
- Prefer upgrading existing entries via indices (if we can discover them).
- If no indices (legacy JSON) or upgrade fails, fall back to install-only logic.
"""
indices = self._find_installed_indices_for_output(ctx, output)
upgraded_any = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded_any = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
if upgraded_any:
# Make upgrades visible to tests
print(f"[nix] output '{output}' successfully upgraded.")
return
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
for idx in indices:
self._remove_index(ctx, idx)
# Ensure installed (includes its own fallback logic)
self._install_only(ctx, output, allow_failure)
# Make upgrades visible to tests (semantic: update requested)
print(f"[nix] output '{output}' successfully upgraded.")
def run(self, ctx: "RepoContext") -> None:
if not self.supports(ctx):
return
outputs = self._profile_outputs(ctx)
if not ctx.quiet:
print(
"[nix] flake detected in "
f"{ctx.identifier}, ensuring outputs: "
+ ", ".join(name for name, _ in outputs)
)
for output, allow_failure in outputs:
if ctx.force_update:
self._force_upgrade_output(ctx, output, allow_failure)
else:
self._install_only(ctx, output, allow_failure)