Compare commits
7 Commits
d1e5a71f77
...
v1.6.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
69d28a461d | ||
|
|
03e414cc9f | ||
|
|
7674762c9a | ||
|
|
a47de15e42 | ||
|
|
37f3057d31 | ||
|
|
d55c8d3726 | ||
|
|
3990560cd7 |
30
CHANGELOG.md
30
CHANGELOG.md
@@ -1,3 +1,33 @@
|
||||
## [1.6.2] - 2025-12-14
|
||||
|
||||
* **pkgmgr version** now also shows the installed pkgmgr version when run outside a repository.
|
||||
|
||||
|
||||
## [1.6.1] - 2025-12-14
|
||||
|
||||
* * Added automatic retry handling for GitHub 403 / rate-limit errors during Nix flake installs (Fibonacci backoff with jitter).
|
||||
|
||||
|
||||
## [1.6.0] - 2025-12-14
|
||||
|
||||
* *** Changed ***
|
||||
- Unified update handling via a single top-level `pkgmgr update` command, removing ambiguous update paths.
|
||||
- Improved update reliability by routing all update logic through a central UpdateManager.
|
||||
- Renamed system update flag from `--system-update` to `--system` for clarity and consistency.
|
||||
- Made mirror handling explicit and safer by separating setup, check, and provision responsibilities.
|
||||
- Improved credential resolution for remote providers (environment → keyring → interactive).
|
||||
|
||||
*** Added ***
|
||||
- Optional system updates via `pkgmgr update --system` (Arch, Debian/Ubuntu, Fedora/RHEL).
|
||||
- `pkgmgr install --update` to force re-running installers and refresh existing installations.
|
||||
- Remote repository provisioning for mirrors on supported providers.
|
||||
- Extended end-to-end test coverage for update and mirror workflows.
|
||||
|
||||
*** Fixed ***
|
||||
- Resolved “Unknown repos command: update” errors after CLI refactoring.
|
||||
- Improved Nix update stability and reduced CI failures caused by transient rate limits.
|
||||
|
||||
|
||||
## [1.5.0] - 2025-12-13
|
||||
|
||||
* - Commands now show live output while running, making long operations easier to follow
|
||||
|
||||
2
Makefile
2
Makefile
@@ -44,7 +44,7 @@ install:
|
||||
# ------------------------------------------------------------
|
||||
|
||||
# Default: keep current auto-detection behavior
|
||||
setup: setup-nix setup-venv
|
||||
setup: setup-venv
|
||||
|
||||
# Explicit: developer setup (Python venv + shell RC + install)
|
||||
setup-venv: setup-nix
|
||||
|
||||
@@ -32,7 +32,7 @@
|
||||
rec {
|
||||
pkgmgr = pyPkgs.buildPythonApplication {
|
||||
pname = "package-manager";
|
||||
version = "1.5.0";
|
||||
version = "1.6.2";
|
||||
|
||||
# Use the git repo as source
|
||||
src = ./.;
|
||||
|
||||
@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "package-manager"
|
||||
version = "1.5.0"
|
||||
version = "1.6.2"
|
||||
description = "Kevin's package-manager tool (pkgmgr)"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.9"
|
||||
@@ -19,7 +19,8 @@ authors = [
|
||||
|
||||
# Base runtime dependencies
|
||||
dependencies = [
|
||||
"PyYAML>=6.0"
|
||||
"PyYAML>=6.0",
|
||||
"tomli; python_version < \"3.11\"",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
@@ -28,7 +28,7 @@ from pkgmgr.actions.install.installers.os_packages import (
|
||||
DebianControlInstaller,
|
||||
RpmSpecInstaller,
|
||||
)
|
||||
from pkgmgr.actions.install.installers.nix_flake import (
|
||||
from pkgmgr.actions.install.installers.nix import (
|
||||
NixFlakeInstaller,
|
||||
)
|
||||
from pkgmgr.actions.install.installers.python import PythonInstaller
|
||||
|
||||
@@ -9,7 +9,7 @@ pkgmgr.actions.install.installers.
|
||||
"""
|
||||
|
||||
from pkgmgr.actions.install.installers.base import BaseInstaller # noqa: F401
|
||||
from pkgmgr.actions.install.installers.nix_flake import NixFlakeInstaller # noqa: F401
|
||||
from pkgmgr.actions.install.installers.nix import NixFlakeInstaller # noqa: F401
|
||||
from pkgmgr.actions.install.installers.python import PythonInstaller # noqa: F401
|
||||
from pkgmgr.actions.install.installers.makefile import MakefileInstaller # noqa: F401
|
||||
|
||||
|
||||
4
src/pkgmgr/actions/install/installers/nix/__init__.py
Normal file
4
src/pkgmgr/actions/install/installers/nix/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .installer import NixFlakeInstaller
|
||||
from .retry import RetryPolicy
|
||||
|
||||
__all__ = ["NixFlakeInstaller", "RetryPolicy"]
|
||||
168
src/pkgmgr/actions/install/installers/nix/installer.py
Normal file
168
src/pkgmgr/actions/install/installers/nix/installer.py
Normal file
@@ -0,0 +1,168 @@
|
||||
# src/pkgmgr/actions/install/installers/nix/installer.py
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from typing import List, Tuple, TYPE_CHECKING
|
||||
|
||||
from pkgmgr.actions.install.installers.base import BaseInstaller
|
||||
|
||||
from .profile import NixProfileInspector
|
||||
from .retry import GitHubRateLimitRetry, RetryPolicy
|
||||
from .runner import CommandRunner
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pkgmgr.actions.install.context import RepoContext
|
||||
|
||||
class NixFlakeInstaller(BaseInstaller):
|
||||
layer = "nix"
|
||||
FLAKE_FILE = "flake.nix"
|
||||
|
||||
def __init__(self, policy: RetryPolicy | None = None) -> None:
|
||||
self._runner = CommandRunner()
|
||||
self._retry = GitHubRateLimitRetry(policy=policy)
|
||||
self._profile = NixProfileInspector()
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Compatibility: supports()
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def supports(self, ctx: "RepoContext") -> bool:
|
||||
if os.environ.get("PKGMGR_DISABLE_NIX_FLAKE_INSTALLER") == "1":
|
||||
if not ctx.quiet:
|
||||
print("[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 – skipping NixFlakeInstaller.")
|
||||
return False
|
||||
|
||||
if shutil.which("nix") is None:
|
||||
return False
|
||||
|
||||
return os.path.exists(os.path.join(ctx.repo_dir, self.FLAKE_FILE))
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Compatibility: output selection
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def _profile_outputs(self, ctx: "RepoContext") -> List[Tuple[str, bool]]:
|
||||
# (output_name, allow_failure)
|
||||
if ctx.identifier in {"pkgmgr", "package-manager"}:
|
||||
return [("pkgmgr", False), ("default", True)]
|
||||
return [("default", False)]
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Compatibility: run()
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def run(self, ctx: "RepoContext") -> None:
|
||||
if not self.supports(ctx):
|
||||
return
|
||||
|
||||
outputs = self._profile_outputs(ctx)
|
||||
|
||||
if not ctx.quiet:
|
||||
print(
|
||||
"[nix] flake detected in "
|
||||
f"{ctx.identifier}, ensuring outputs: "
|
||||
+ ", ".join(name for name, _ in outputs)
|
||||
)
|
||||
|
||||
for output, allow_failure in outputs:
|
||||
if ctx.force_update:
|
||||
self._force_upgrade_output(ctx, output, allow_failure)
|
||||
else:
|
||||
self._install_only(ctx, output, allow_failure)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Core logic (unchanged semantics)
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def _installable(self, ctx: "RepoContext", output: str) -> str:
|
||||
return f"{ctx.repo_dir}#{output}"
|
||||
|
||||
def _install_only(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
|
||||
install_cmd = f"nix profile install {self._installable(ctx, output)}"
|
||||
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] install: {install_cmd}")
|
||||
|
||||
res = self._retry.run_with_retry(ctx, self._runner, install_cmd)
|
||||
|
||||
if res.returncode == 0:
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] output '{output}' successfully installed.")
|
||||
return
|
||||
|
||||
if not ctx.quiet:
|
||||
print(
|
||||
f"[nix] install failed for '{output}' (exit {res.returncode}), "
|
||||
"trying index-based upgrade/remove+install..."
|
||||
)
|
||||
|
||||
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
|
||||
|
||||
upgraded = False
|
||||
for idx in indices:
|
||||
if self._upgrade_index(ctx, idx):
|
||||
upgraded = True
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
|
||||
|
||||
if upgraded:
|
||||
return
|
||||
|
||||
if indices and not ctx.quiet:
|
||||
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
|
||||
|
||||
for idx in indices:
|
||||
self._remove_index(ctx, idx)
|
||||
|
||||
final = self._runner.run(ctx, install_cmd, allow_failure=True)
|
||||
if final.returncode == 0:
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] output '{output}' successfully re-installed.")
|
||||
return
|
||||
|
||||
print(f"[ERROR] Failed to install Nix flake output '{output}' (exit {final.returncode})")
|
||||
|
||||
if not allow_failure:
|
||||
raise SystemExit(final.returncode)
|
||||
|
||||
print(f"[WARNING] Continuing despite failure of optional output '{output}'.")
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# force_update path (unchanged semantics)
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def _force_upgrade_output(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
|
||||
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
|
||||
|
||||
upgraded_any = False
|
||||
for idx in indices:
|
||||
if self._upgrade_index(ctx, idx):
|
||||
upgraded_any = True
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
|
||||
|
||||
if upgraded_any:
|
||||
print(f"[nix] output '{output}' successfully upgraded.")
|
||||
return
|
||||
|
||||
if indices and not ctx.quiet:
|
||||
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
|
||||
|
||||
for idx in indices:
|
||||
self._remove_index(ctx, idx)
|
||||
|
||||
self._install_only(ctx, output, allow_failure)
|
||||
|
||||
print(f"[nix] output '{output}' successfully upgraded.")
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Helpers
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def _upgrade_index(self, ctx: "RepoContext", idx: int) -> bool:
|
||||
res = self._runner.run(ctx, f"nix profile upgrade --refresh {idx}", allow_failure=True)
|
||||
return res.returncode == 0
|
||||
|
||||
def _remove_index(self, ctx: "RepoContext", idx: int) -> None:
|
||||
self._runner.run(ctx, f"nix profile remove {idx}", allow_failure=True)
|
||||
71
src/pkgmgr/actions/install/installers/nix/profile.py
Normal file
71
src/pkgmgr/actions/install/installers/nix/profile.py
Normal file
@@ -0,0 +1,71 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any, List, TYPE_CHECKING
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pkgmgr.actions.install.context import RepoContext
|
||||
from .runner import CommandRunner
|
||||
|
||||
class NixProfileInspector:
|
||||
"""
|
||||
Reads and interprets `nix profile list --json` and provides helpers for
|
||||
finding indices matching a given output name.
|
||||
"""
|
||||
|
||||
def find_installed_indices_for_output(self, ctx: "RepoContext", runner: "CommandRunner", output: str) -> List[int]:
|
||||
res = runner.run(ctx, "nix profile list --json", allow_failure=True)
|
||||
if res.returncode != 0:
|
||||
return []
|
||||
|
||||
try:
|
||||
data = json.loads(res.stdout or "{}")
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
|
||||
indices: List[int] = []
|
||||
|
||||
elements = data.get("elements")
|
||||
if isinstance(elements, dict):
|
||||
for idx_str, elem in elements.items():
|
||||
try:
|
||||
idx = int(idx_str)
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
if self._element_matches_output(elem, output):
|
||||
indices.append(idx)
|
||||
return sorted(indices)
|
||||
|
||||
if isinstance(elements, list):
|
||||
for elem in elements:
|
||||
idx = elem.get("index") if isinstance(elem, dict) else None
|
||||
if isinstance(idx, int) and self._element_matches_output(elem, output):
|
||||
indices.append(idx)
|
||||
return sorted(indices)
|
||||
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def element_matches_output(elem: Any, output: str) -> bool:
|
||||
return NixProfileInspector._element_matches_output(elem, output)
|
||||
|
||||
@staticmethod
|
||||
def _element_matches_output(elem: Any, output: str) -> bool:
|
||||
out = (output or "").strip()
|
||||
if not out or not isinstance(elem, dict):
|
||||
return False
|
||||
|
||||
candidates: List[str] = []
|
||||
for k in ("attrPath", "originalUrl", "url", "storePath", "name"):
|
||||
v = elem.get(k)
|
||||
if isinstance(v, str) and v:
|
||||
candidates.append(v)
|
||||
|
||||
for c in candidates:
|
||||
if c == out:
|
||||
return True
|
||||
if f"#{out}" in c:
|
||||
return True
|
||||
|
||||
return False
|
||||
87
src/pkgmgr/actions/install/installers/nix/retry.py
Normal file
87
src/pkgmgr/actions/install/installers/nix/retry.py
Normal file
@@ -0,0 +1,87 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import random
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable, TYPE_CHECKING
|
||||
|
||||
from .types import RunResult
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pkgmgr.actions.install.context import RepoContext
|
||||
from .runner import CommandRunner
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RetryPolicy:
|
||||
max_attempts: int = 7
|
||||
base_delay_seconds: int = 30
|
||||
jitter_seconds_min: int = 0
|
||||
jitter_seconds_max: int = 60
|
||||
|
||||
|
||||
class GitHubRateLimitRetry:
|
||||
"""
|
||||
Retries nix install commands only when the error looks like a GitHub API rate limit (HTTP 403).
|
||||
Backoff: Fibonacci(base, base, ...) + random jitter.
|
||||
"""
|
||||
|
||||
def __init__(self, policy: RetryPolicy | None = None) -> None:
|
||||
self._policy = policy or RetryPolicy()
|
||||
|
||||
def run_with_retry(
|
||||
self,
|
||||
ctx: "RepoContext",
|
||||
runner: "CommandRunner",
|
||||
install_cmd: str,
|
||||
) -> RunResult:
|
||||
quiet = bool(getattr(ctx, "quiet", False))
|
||||
delays = list(self._fibonacci_backoff(self._policy.base_delay_seconds, self._policy.max_attempts))
|
||||
|
||||
last: RunResult | None = None
|
||||
|
||||
for attempt, base_delay in enumerate(delays, start=1):
|
||||
if not quiet:
|
||||
print(f"[nix] attempt {attempt}/{self._policy.max_attempts}: {install_cmd}")
|
||||
|
||||
res = runner.run(ctx, install_cmd, allow_failure=True)
|
||||
last = res
|
||||
|
||||
if res.returncode == 0:
|
||||
return res
|
||||
|
||||
combined = f"{res.stdout}\n{res.stderr}"
|
||||
if not self._is_github_rate_limit_error(combined):
|
||||
return res
|
||||
|
||||
if attempt >= self._policy.max_attempts:
|
||||
break
|
||||
|
||||
jitter = random.randint(self._policy.jitter_seconds_min, self._policy.jitter_seconds_max)
|
||||
wait_time = base_delay + jitter
|
||||
|
||||
if not quiet:
|
||||
print(
|
||||
"[nix] GitHub rate limit detected (403). "
|
||||
f"Retrying in {wait_time}s (base={base_delay}s, jitter={jitter}s)..."
|
||||
)
|
||||
|
||||
time.sleep(wait_time)
|
||||
|
||||
return last if last is not None else RunResult(returncode=1, stdout="", stderr="nix install retry failed")
|
||||
|
||||
@staticmethod
|
||||
def _is_github_rate_limit_error(text: str) -> bool:
|
||||
t = (text or "").lower()
|
||||
return (
|
||||
"http error 403" in t
|
||||
or "rate limit exceeded" in t
|
||||
or "github api rate limit" in t
|
||||
or "api rate limit exceeded" in t
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _fibonacci_backoff(base: int, attempts: int) -> Iterable[int]:
|
||||
a, b = base, base
|
||||
for _ in range(max(1, attempts)):
|
||||
yield a
|
||||
a, b = b, a + b
|
||||
64
src/pkgmgr/actions/install/installers/nix/runner.py
Normal file
64
src/pkgmgr/actions/install/installers/nix/runner.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from .types import RunResult
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pkgmgr.actions.install.context import RepoContext
|
||||
|
||||
class CommandRunner:
|
||||
"""
|
||||
Executes commands (shell=True) inside a repository directory (if provided).
|
||||
Supports preview mode and compact failure output logging.
|
||||
"""
|
||||
|
||||
def run(self, ctx: "RepoContext", cmd: str, allow_failure: bool) -> RunResult:
|
||||
repo_dir = getattr(ctx, "repo_dir", None) or getattr(ctx, "repo_path", None)
|
||||
preview = bool(getattr(ctx, "preview", False))
|
||||
quiet = bool(getattr(ctx, "quiet", False))
|
||||
|
||||
if preview:
|
||||
if not quiet:
|
||||
print(f"[preview] {cmd}")
|
||||
return RunResult(returncode=0, stdout="", stderr="")
|
||||
|
||||
try:
|
||||
p = subprocess.run(
|
||||
cmd,
|
||||
shell=True,
|
||||
cwd=repo_dir,
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
)
|
||||
except Exception as e:
|
||||
if not allow_failure:
|
||||
raise
|
||||
return RunResult(returncode=1, stdout="", stderr=str(e))
|
||||
|
||||
res = RunResult(returncode=p.returncode, stdout=p.stdout or "", stderr=p.stderr or "")
|
||||
|
||||
if res.returncode != 0 and not quiet:
|
||||
self._print_compact_failure(res)
|
||||
|
||||
if res.returncode != 0 and not allow_failure:
|
||||
raise SystemExit(res.returncode)
|
||||
|
||||
return res
|
||||
|
||||
@staticmethod
|
||||
def _print_compact_failure(res: RunResult) -> None:
|
||||
out = (res.stdout or "").strip()
|
||||
err = (res.stderr or "").strip()
|
||||
|
||||
if out:
|
||||
print("[nix] stdout (last lines):")
|
||||
print("\n".join(out.splitlines()[-20:]))
|
||||
|
||||
if err:
|
||||
print("[nix] stderr (last lines):")
|
||||
print("\n".join(err.splitlines()[-40:]))
|
||||
10
src/pkgmgr/actions/install/installers/nix/types.py
Normal file
10
src/pkgmgr/actions/install/installers/nix/types.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RunResult:
|
||||
returncode: int
|
||||
stdout: str
|
||||
stderr: str
|
||||
@@ -1,238 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import TYPE_CHECKING, List, Tuple
|
||||
|
||||
from pkgmgr.actions.install.installers.base import BaseInstaller
|
||||
from pkgmgr.core.command.run import run_command
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pkgmgr.actions.install.context import RepoContext
|
||||
|
||||
|
||||
class NixFlakeInstaller(BaseInstaller):
|
||||
layer = "nix"
|
||||
FLAKE_FILE = "flake.nix"
|
||||
|
||||
def supports(self, ctx: "RepoContext") -> bool:
|
||||
if os.environ.get("PKGMGR_DISABLE_NIX_FLAKE_INSTALLER") == "1":
|
||||
if not ctx.quiet:
|
||||
print("[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 – skipping NixFlakeInstaller.")
|
||||
return False
|
||||
|
||||
if shutil.which("nix") is None:
|
||||
return False
|
||||
|
||||
return os.path.exists(os.path.join(ctx.repo_dir, self.FLAKE_FILE))
|
||||
|
||||
def _profile_outputs(self, ctx: "RepoContext") -> List[Tuple[str, bool]]:
|
||||
# (output_name, allow_failure)
|
||||
if ctx.identifier in {"pkgmgr", "package-manager"}:
|
||||
return [("pkgmgr", False), ("default", True)]
|
||||
return [("default", False)]
|
||||
|
||||
def _installable(self, ctx: "RepoContext", output: str) -> str:
|
||||
return f"{ctx.repo_dir}#{output}"
|
||||
|
||||
def _run(self, ctx: "RepoContext", cmd: str, allow_failure: bool = True):
|
||||
return run_command(
|
||||
cmd,
|
||||
cwd=ctx.repo_dir,
|
||||
preview=ctx.preview,
|
||||
allow_failure=allow_failure,
|
||||
)
|
||||
|
||||
def _profile_list_json(self, ctx: "RepoContext") -> dict:
|
||||
"""
|
||||
Read current Nix profile entries as JSON (best-effort).
|
||||
|
||||
NOTE: Nix versions differ:
|
||||
- Newer: {"elements": [ { "index": 0, "attrPath": "...", ... }, ... ]}
|
||||
- Older: {"elements": [ "nixpkgs#hello", ... ]} (strings)
|
||||
|
||||
We return {} on failure or in preview mode.
|
||||
"""
|
||||
if ctx.preview:
|
||||
return {}
|
||||
|
||||
proc = subprocess.run(
|
||||
["nix", "profile", "list", "--json"],
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
env=os.environ.copy(),
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
return {}
|
||||
|
||||
try:
|
||||
return json.loads(proc.stdout or "{}")
|
||||
except json.JSONDecodeError:
|
||||
return {}
|
||||
|
||||
def _find_installed_indices_for_output(self, ctx: "RepoContext", output: str) -> List[int]:
|
||||
"""
|
||||
Find installed profile indices for a given output.
|
||||
|
||||
Works across Nix JSON variants:
|
||||
- If elements are dicts: we can extract indices.
|
||||
- If elements are strings: we cannot extract indices -> return [].
|
||||
"""
|
||||
data = self._profile_list_json(ctx)
|
||||
elements = data.get("elements", []) or []
|
||||
|
||||
matches: List[int] = []
|
||||
|
||||
for el in elements:
|
||||
# Legacy JSON format: plain strings -> no index information
|
||||
if not isinstance(el, dict):
|
||||
continue
|
||||
|
||||
idx = el.get("index")
|
||||
if idx is None:
|
||||
continue
|
||||
|
||||
attr_path = el.get("attrPath") or el.get("attr_path") or ""
|
||||
pname = el.get("pname") or ""
|
||||
name = el.get("name") or ""
|
||||
|
||||
if attr_path == output:
|
||||
matches.append(int(idx))
|
||||
continue
|
||||
|
||||
if pname == output or name == output:
|
||||
matches.append(int(idx))
|
||||
continue
|
||||
|
||||
if isinstance(attr_path, str) and attr_path.endswith(f".{output}"):
|
||||
matches.append(int(idx))
|
||||
continue
|
||||
|
||||
return matches
|
||||
|
||||
def _upgrade_index(self, ctx: "RepoContext", index: int) -> bool:
|
||||
cmd = f"nix profile upgrade --refresh {index}"
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] upgrade: {cmd}")
|
||||
res = self._run(ctx, cmd, allow_failure=True)
|
||||
return res.returncode == 0
|
||||
|
||||
def _remove_index(self, ctx: "RepoContext", index: int) -> None:
|
||||
cmd = f"nix profile remove {index}"
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] remove: {cmd}")
|
||||
self._run(ctx, cmd, allow_failure=True)
|
||||
|
||||
def _install_only(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
|
||||
"""
|
||||
Install output; on failure, try index-based upgrade/remove+install if possible.
|
||||
"""
|
||||
installable = self._installable(ctx, output)
|
||||
install_cmd = f"nix profile install {installable}"
|
||||
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] install: {install_cmd}")
|
||||
|
||||
res = self._run(ctx, install_cmd, allow_failure=True)
|
||||
if res.returncode == 0:
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] output '{output}' successfully installed.")
|
||||
return
|
||||
|
||||
if not ctx.quiet:
|
||||
print(
|
||||
f"[nix] install failed for '{output}' (exit {res.returncode}), "
|
||||
"trying index-based upgrade/remove+install..."
|
||||
)
|
||||
|
||||
indices = self._find_installed_indices_for_output(ctx, output)
|
||||
|
||||
# 1) Try upgrading existing indices (only possible on newer JSON format)
|
||||
upgraded = False
|
||||
for idx in indices:
|
||||
if self._upgrade_index(ctx, idx):
|
||||
upgraded = True
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
|
||||
|
||||
if upgraded:
|
||||
return
|
||||
|
||||
# 2) Remove matching indices and retry install
|
||||
if indices and not ctx.quiet:
|
||||
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
|
||||
|
||||
for idx in indices:
|
||||
self._remove_index(ctx, idx)
|
||||
|
||||
final = self._run(ctx, install_cmd, allow_failure=True)
|
||||
if final.returncode == 0:
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] output '{output}' successfully re-installed.")
|
||||
return
|
||||
|
||||
msg = f"[ERROR] Failed to install Nix flake output '{output}' (exit {final.returncode})"
|
||||
print(msg)
|
||||
|
||||
if not allow_failure:
|
||||
raise SystemExit(final.returncode)
|
||||
|
||||
print(f"[WARNING] Continuing despite failure of optional output '{output}'.")
|
||||
|
||||
def _force_upgrade_output(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
|
||||
"""
|
||||
force_update path:
|
||||
- Prefer upgrading existing entries via indices (if we can discover them).
|
||||
- If no indices (legacy JSON) or upgrade fails, fall back to install-only logic.
|
||||
"""
|
||||
indices = self._find_installed_indices_for_output(ctx, output)
|
||||
|
||||
upgraded_any = False
|
||||
for idx in indices:
|
||||
if self._upgrade_index(ctx, idx):
|
||||
upgraded_any = True
|
||||
if not ctx.quiet:
|
||||
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
|
||||
|
||||
if upgraded_any:
|
||||
# Make upgrades visible to tests
|
||||
print(f"[nix] output '{output}' successfully upgraded.")
|
||||
return
|
||||
|
||||
if indices and not ctx.quiet:
|
||||
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
|
||||
|
||||
for idx in indices:
|
||||
self._remove_index(ctx, idx)
|
||||
|
||||
# Ensure installed (includes its own fallback logic)
|
||||
self._install_only(ctx, output, allow_failure)
|
||||
|
||||
# Make upgrades visible to tests (semantic: update requested)
|
||||
print(f"[nix] output '{output}' successfully upgraded.")
|
||||
|
||||
def run(self, ctx: "RepoContext") -> None:
|
||||
if not self.supports(ctx):
|
||||
return
|
||||
|
||||
outputs = self._profile_outputs(ctx)
|
||||
|
||||
if not ctx.quiet:
|
||||
print(
|
||||
"[nix] flake detected in "
|
||||
f"{ctx.identifier}, ensuring outputs: "
|
||||
+ ", ".join(name for name, _ in outputs)
|
||||
)
|
||||
|
||||
for output, allow_failure in outputs:
|
||||
if ctx.force_update:
|
||||
self._force_upgrade_output(ctx, output, allow_failure)
|
||||
else:
|
||||
self._install_only(ctx, output, allow_failure)
|
||||
@@ -9,8 +9,13 @@ from pkgmgr.core.repository.dir import get_repo_dir
|
||||
from pkgmgr.core.repository.identifier import get_repo_identifier
|
||||
from pkgmgr.core.git import get_tags
|
||||
from pkgmgr.core.version.semver import SemVer, find_latest_version
|
||||
from pkgmgr.core.version.installed import (
|
||||
get_installed_python_version,
|
||||
get_installed_nix_profile_version,
|
||||
)
|
||||
from pkgmgr.core.version.source import (
|
||||
read_pyproject_version,
|
||||
read_pyproject_project_name,
|
||||
read_flake_version,
|
||||
read_pkgbuild_version,
|
||||
read_debian_changelog_version,
|
||||
@@ -18,10 +23,54 @@ from pkgmgr.core.version.source import (
|
||||
read_ansible_galaxy_version,
|
||||
)
|
||||
|
||||
|
||||
Repository = Dict[str, Any]
|
||||
|
||||
|
||||
def _print_pkgmgr_self_version() -> None:
|
||||
"""
|
||||
Print version information for pkgmgr itself (installed env + nix profile),
|
||||
used when no repository is selected (e.g. user is not inside a repo).
|
||||
"""
|
||||
print("pkgmgr version info")
|
||||
print("====================")
|
||||
print("\nRepository: <pkgmgr self>")
|
||||
print("----------------------------------------")
|
||||
|
||||
# Common distribution/module naming variants.
|
||||
python_candidates = [
|
||||
"package-manager", # PyPI dist name in your project
|
||||
"package_manager", # module-ish variant
|
||||
"pkgmgr", # console/alias-ish
|
||||
]
|
||||
nix_candidates = [
|
||||
"pkgmgr",
|
||||
"package-manager",
|
||||
]
|
||||
|
||||
installed_python = get_installed_python_version(*python_candidates)
|
||||
installed_nix = get_installed_nix_profile_version(*nix_candidates)
|
||||
|
||||
if installed_python:
|
||||
print(
|
||||
f"Installed (Python env): {installed_python.version} "
|
||||
f"(dist: {installed_python.name})"
|
||||
)
|
||||
else:
|
||||
print("Installed (Python env): <not installed>")
|
||||
|
||||
if installed_nix:
|
||||
print(
|
||||
f"Installed (Nix profile): {installed_nix.version} "
|
||||
f"(match: {installed_nix.name})"
|
||||
)
|
||||
else:
|
||||
print("Installed (Nix profile): <not installed>")
|
||||
|
||||
# Helpful context for debugging "why do versions differ?"
|
||||
print(f"Python executable: {sys.executable}")
|
||||
print(f"Python prefix: {sys.prefix}")
|
||||
|
||||
|
||||
def handle_version(
|
||||
args,
|
||||
ctx: CLIContext,
|
||||
@@ -30,20 +79,39 @@ def handle_version(
|
||||
"""
|
||||
Handle the 'version' command.
|
||||
|
||||
Shows version information from various sources (git tags, pyproject,
|
||||
flake.nix, PKGBUILD, debian, spec, Ansible Galaxy).
|
||||
"""
|
||||
Shows version information from:
|
||||
- Git tags
|
||||
- packaging metadata
|
||||
- installed Python environment
|
||||
- installed Nix profile
|
||||
|
||||
repo_list = selected
|
||||
if not repo_list:
|
||||
print("No repositories selected for version.")
|
||||
sys.exit(1)
|
||||
Special case:
|
||||
- If no repositories are selected (e.g. not in a repo and no identifiers),
|
||||
print pkgmgr's own installed versions instead of exiting with an error.
|
||||
"""
|
||||
if not selected:
|
||||
_print_pkgmgr_self_version()
|
||||
return
|
||||
|
||||
print("pkgmgr version info")
|
||||
print("====================")
|
||||
|
||||
for repo in repo_list:
|
||||
# Resolve repository directory
|
||||
for repo in selected:
|
||||
identifier = get_repo_identifier(repo, ctx.all_repositories)
|
||||
|
||||
python_candidates: list[str] = []
|
||||
nix_candidates: list[str] = [identifier]
|
||||
|
||||
for key in ("pypi", "pip", "python_package", "distribution", "package"):
|
||||
val = repo.get(key)
|
||||
if isinstance(val, str) and val.strip():
|
||||
python_candidates.append(val.strip())
|
||||
|
||||
python_candidates.append(identifier)
|
||||
|
||||
installed_python = get_installed_python_version(*python_candidates)
|
||||
installed_nix = get_installed_nix_profile_version(*nix_candidates)
|
||||
|
||||
repo_dir = repo.get("directory")
|
||||
if not repo_dir:
|
||||
try:
|
||||
@@ -51,51 +119,79 @@ def handle_version(
|
||||
except Exception:
|
||||
repo_dir = None
|
||||
|
||||
# If no local clone exists, skip gracefully with info message
|
||||
if not repo_dir or not os.path.isdir(repo_dir):
|
||||
identifier = get_repo_identifier(repo, ctx.all_repositories)
|
||||
print(f"\nRepository: {identifier}")
|
||||
print("----------------------------------------")
|
||||
print(
|
||||
"[INFO] Skipped: repository directory does not exist "
|
||||
"locally, version detection is not possible."
|
||||
"[INFO] Skipped: repository directory does not exist locally, "
|
||||
"version detection is not possible."
|
||||
)
|
||||
|
||||
if installed_python:
|
||||
print(
|
||||
f"Installed (Python env): {installed_python.version} "
|
||||
f"(dist: {installed_python.name})"
|
||||
)
|
||||
else:
|
||||
print("Installed (Python env): <not installed>")
|
||||
|
||||
if installed_nix:
|
||||
print(
|
||||
f"Installed (Nix profile): {installed_nix.version} "
|
||||
f"(match: {installed_nix.name})"
|
||||
)
|
||||
else:
|
||||
print("Installed (Nix profile): <not installed>")
|
||||
|
||||
continue
|
||||
|
||||
print(f"\nRepository: {repo_dir}")
|
||||
print("----------------------------------------")
|
||||
|
||||
# 1) Git tags (SemVer)
|
||||
try:
|
||||
tags = get_tags(cwd=repo_dir)
|
||||
except Exception as exc:
|
||||
print(f"[ERROR] Could not read git tags: {exc}")
|
||||
tags = []
|
||||
|
||||
latest_tag_info: Optional[Tuple[str, SemVer]]
|
||||
latest_tag_info = find_latest_version(tags) if tags else None
|
||||
latest_tag_info: Optional[Tuple[str, SemVer]] = (
|
||||
find_latest_version(tags) if tags else None
|
||||
)
|
||||
|
||||
if latest_tag_info is None:
|
||||
latest_tag_str = None
|
||||
latest_ver = None
|
||||
if latest_tag_info:
|
||||
tag, ver = latest_tag_info
|
||||
print(f"Git (latest SemVer tag): {tag} (parsed: {ver})")
|
||||
else:
|
||||
latest_tag_str, latest_ver = latest_tag_info
|
||||
print("Git (latest SemVer tag): <none found>")
|
||||
|
||||
# 2) Packaging / metadata sources
|
||||
pyproject_version = read_pyproject_version(repo_dir)
|
||||
pyproject_name = read_pyproject_project_name(repo_dir)
|
||||
flake_version = read_flake_version(repo_dir)
|
||||
pkgbuild_version = read_pkgbuild_version(repo_dir)
|
||||
debian_version = read_debian_changelog_version(repo_dir)
|
||||
spec_version = read_spec_version(repo_dir)
|
||||
ansible_version = read_ansible_galaxy_version(repo_dir)
|
||||
|
||||
# 3) Print version summary
|
||||
if latest_ver is not None:
|
||||
if pyproject_name:
|
||||
installed_python = get_installed_python_version(
|
||||
pyproject_name, *python_candidates
|
||||
)
|
||||
|
||||
if installed_python:
|
||||
print(
|
||||
f"Git (latest SemVer tag): {latest_tag_str} (parsed: {latest_ver})"
|
||||
f"Installed (Python env): {installed_python.version} "
|
||||
f"(dist: {installed_python.name})"
|
||||
)
|
||||
else:
|
||||
print("Git (latest SemVer tag): <none found>")
|
||||
print("Installed (Python env): <not installed>")
|
||||
|
||||
if installed_nix:
|
||||
print(
|
||||
f"Installed (Nix profile): {installed_nix.version} "
|
||||
f"(match: {installed_nix.name})"
|
||||
)
|
||||
else:
|
||||
print("Installed (Nix profile): <not installed>")
|
||||
|
||||
print(f"pyproject.toml: {pyproject_version or '<not found>'}")
|
||||
print(f"flake.nix: {flake_version or '<not found>'}")
|
||||
@@ -104,15 +200,16 @@ def handle_version(
|
||||
print(f"package-manager.spec: {spec_version or '<not found>'}")
|
||||
print(f"Ansible Galaxy meta: {ansible_version or '<not found>'}")
|
||||
|
||||
# 4) Consistency hint (Git tag vs. pyproject)
|
||||
if latest_ver is not None and pyproject_version is not None:
|
||||
if latest_tag_info and pyproject_version:
|
||||
try:
|
||||
file_ver = SemVer.parse(pyproject_version)
|
||||
if file_ver != latest_ver:
|
||||
if file_ver != latest_tag_info[1]:
|
||||
print(
|
||||
f"[WARN] Version mismatch: Git={latest_ver}, pyproject={file_ver}"
|
||||
f"[WARN] Version mismatch: "
|
||||
f"Git={latest_tag_info[1]}, pyproject={file_ver}"
|
||||
)
|
||||
except ValueError:
|
||||
print(
|
||||
f"[WARN] pyproject version {pyproject_version!r} is not valid SemVer."
|
||||
f"[WARN] pyproject version {pyproject_version!r} "
|
||||
f"is not valid SemVer."
|
||||
)
|
||||
|
||||
168
src/pkgmgr/core/version/installed.py
Normal file
168
src/pkgmgr/core/version/installed.py
Normal file
@@ -0,0 +1,168 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable, Optional, Tuple
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class InstalledVersion:
|
||||
"""
|
||||
Represents a resolved installed version and the matched name.
|
||||
"""
|
||||
name: str
|
||||
version: str
|
||||
|
||||
|
||||
def _normalize(name: str) -> str:
|
||||
return re.sub(r"[-_.]+", "-", (name or "").strip()).lower()
|
||||
|
||||
|
||||
def _unique_candidates(names: Iterable[str]) -> list[str]:
|
||||
seen: set[str] = set()
|
||||
out: list[str] = []
|
||||
for n in names:
|
||||
if not n:
|
||||
continue
|
||||
key = _normalize(n)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
out.append(n)
|
||||
return out
|
||||
|
||||
|
||||
def get_installed_python_version(*candidates: str) -> Optional[InstalledVersion]:
|
||||
"""
|
||||
Detect installed Python package version in the CURRENT Python environment.
|
||||
|
||||
Strategy:
|
||||
1) Exact normalized match using importlib.metadata.version()
|
||||
2) Substring fallback by scanning installed distributions
|
||||
"""
|
||||
try:
|
||||
from importlib import metadata as importlib_metadata
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
candidates = _unique_candidates(candidates)
|
||||
|
||||
expanded: list[str] = []
|
||||
for c in candidates:
|
||||
n = _normalize(c)
|
||||
expanded.extend([c, n, n.replace("-", "_"), n.replace("-", ".")])
|
||||
expanded = _unique_candidates(expanded)
|
||||
|
||||
# 1) Direct queries first (fast path)
|
||||
for name in expanded:
|
||||
try:
|
||||
version = importlib_metadata.version(name)
|
||||
return InstalledVersion(name=name, version=version)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# 2) Fallback: scan distributions (last resort)
|
||||
try:
|
||||
dists = importlib_metadata.distributions()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
norm_candidates = {_normalize(c) for c in candidates}
|
||||
|
||||
for dist in dists:
|
||||
dist_name = dist.metadata.get("Name", "") or ""
|
||||
norm_dist = _normalize(dist_name)
|
||||
for c in norm_candidates:
|
||||
if c and (c in norm_dist or norm_dist in c):
|
||||
ver = getattr(dist, "version", None)
|
||||
if ver:
|
||||
return InstalledVersion(name=dist_name, version=ver)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _run_nix(args: list[str]) -> Tuple[int, str, str]:
|
||||
p = subprocess.run(
|
||||
args,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
return p.returncode, p.stdout or "", p.stderr or ""
|
||||
|
||||
|
||||
def _extract_version_from_store_path(path: str) -> Optional[str]:
|
||||
if not path:
|
||||
return None
|
||||
base = path.rstrip("/").split("/")[-1]
|
||||
if "-" not in base:
|
||||
return None
|
||||
tail = base.split("-")[-1]
|
||||
if re.match(r"\d+(\.\d+){0,3}([a-z0-9+._-]*)?$", tail, re.I):
|
||||
return tail
|
||||
return None
|
||||
|
||||
|
||||
def get_installed_nix_profile_version(*candidates: str) -> Optional[InstalledVersion]:
|
||||
"""
|
||||
Detect installed version from the current Nix profile.
|
||||
|
||||
Strategy:
|
||||
1) JSON output (exact normalized match)
|
||||
2) Text fallback (substring)
|
||||
"""
|
||||
if shutil.which("nix") is None:
|
||||
return None
|
||||
|
||||
candidates = _unique_candidates(candidates)
|
||||
if not candidates:
|
||||
return None
|
||||
|
||||
norm_candidates = {_normalize(c) for c in candidates}
|
||||
|
||||
# Preferred: JSON output
|
||||
rc, out, _ = _run_nix(["nix", "profile", "list", "--json"])
|
||||
if rc == 0 and out.strip():
|
||||
try:
|
||||
data = json.loads(out)
|
||||
elements = data.get("elements") or data.get("items") or {}
|
||||
if isinstance(elements, dict):
|
||||
for elem in elements.values():
|
||||
if not isinstance(elem, dict):
|
||||
continue
|
||||
name = (elem.get("name") or elem.get("pname") or "").strip()
|
||||
version = (elem.get("version") or "").strip()
|
||||
norm_name = _normalize(name)
|
||||
|
||||
if norm_name in norm_candidates:
|
||||
if version:
|
||||
return InstalledVersion(name=name, version=version)
|
||||
for sp in elem.get("storePaths", []) or []:
|
||||
guess = _extract_version_from_store_path(sp)
|
||||
if guess:
|
||||
return InstalledVersion(name=name, version=guess)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback: text mode
|
||||
rc, out, _ = _run_nix(["nix", "profile", "list"])
|
||||
if rc != 0:
|
||||
return None
|
||||
|
||||
for line in out.splitlines():
|
||||
norm_line = _normalize(line)
|
||||
for c in norm_candidates:
|
||||
if c in norm_line:
|
||||
m = re.search(r"\b\d+(\.\d+){0,3}[a-z0-9+._-]*\b", line, re.I)
|
||||
if m:
|
||||
return InstalledVersion(name=c, version=m.group(0))
|
||||
if "/nix/store/" in line:
|
||||
guess = _extract_version_from_store_path(line.split()[-1])
|
||||
if guess:
|
||||
return InstalledVersion(name=c, version=guess)
|
||||
|
||||
return None
|
||||
@@ -1,21 +1,3 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Helpers to extract version information from various packaging files.
|
||||
|
||||
All functions take a repository directory and return either a version
|
||||
string or None if the corresponding file or version field is missing.
|
||||
|
||||
Supported sources:
|
||||
- pyproject.toml (PEP 621, [project].version)
|
||||
- flake.nix (version = "X.Y.Z";)
|
||||
- PKGBUILD (pkgver / pkgrel)
|
||||
- debian/changelog (first entry line: package (version) ...)
|
||||
- RPM spec file (package-manager.spec: Version / Release)
|
||||
- Ansible Galaxy (galaxy.yml or meta/main.yml)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
@@ -30,46 +12,61 @@ def read_pyproject_version(repo_dir: str) -> Optional[str]:
|
||||
Read the version from pyproject.toml in repo_dir, if present.
|
||||
|
||||
Expects a PEP 621-style [project] table with a 'version' field.
|
||||
Returns the version string or None.
|
||||
"""
|
||||
path = os.path.join(repo_dir, "pyproject.toml")
|
||||
if not os.path.exists(path):
|
||||
if not os.path.isfile(path):
|
||||
return None
|
||||
|
||||
try:
|
||||
try:
|
||||
import tomllib # Python 3.11+
|
||||
except ModuleNotFoundError: # pragma: no cover
|
||||
tomllib = None
|
||||
|
||||
if tomllib is None:
|
||||
return None
|
||||
import tomllib # Python 3.11+
|
||||
except Exception:
|
||||
import tomli as tomllib # type: ignore
|
||||
|
||||
try:
|
||||
with open(path, "rb") as f:
|
||||
data = tomllib.load(f)
|
||||
|
||||
project = data.get("project", {})
|
||||
if isinstance(project, dict):
|
||||
version = project.get("version")
|
||||
if isinstance(version, str):
|
||||
return version.strip() or None
|
||||
project = data.get("project") or {}
|
||||
version = project.get("version")
|
||||
return str(version).strip() if version else None
|
||||
except Exception:
|
||||
# Intentionally swallow errors and fall back to None.
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
def read_pyproject_project_name(repo_dir: str) -> Optional[str]:
|
||||
"""
|
||||
Read distribution name from pyproject.toml ([project].name).
|
||||
|
||||
This is required to correctly resolve installed Python package
|
||||
versions via importlib.metadata.
|
||||
"""
|
||||
path = os.path.join(repo_dir, "pyproject.toml")
|
||||
if not os.path.isfile(path):
|
||||
return None
|
||||
|
||||
try:
|
||||
import tomllib # Python 3.11+
|
||||
except Exception:
|
||||
import tomli as tomllib # type: ignore
|
||||
|
||||
try:
|
||||
with open(path, "rb") as f:
|
||||
data = tomllib.load(f)
|
||||
project = data.get("project") or {}
|
||||
name = project.get("name")
|
||||
return str(name).strip() if name else None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def read_flake_version(repo_dir: str) -> Optional[str]:
|
||||
"""
|
||||
Read the version from flake.nix in repo_dir, if present.
|
||||
|
||||
Looks for a line like:
|
||||
version = "1.2.3";
|
||||
and returns the string inside the quotes.
|
||||
Looks for:
|
||||
version = "X.Y.Z";
|
||||
"""
|
||||
path = os.path.join(repo_dir, "flake.nix")
|
||||
if not os.path.exists(path):
|
||||
if not os.path.isfile(path):
|
||||
return None
|
||||
|
||||
try:
|
||||
@@ -81,22 +78,21 @@ def read_flake_version(repo_dir: str) -> Optional[str]:
|
||||
match = re.search(r'version\s*=\s*"([^"]+)"', text)
|
||||
if not match:
|
||||
return None
|
||||
version = match.group(1).strip()
|
||||
return version or None
|
||||
|
||||
return match.group(1).strip() or None
|
||||
|
||||
|
||||
def read_pkgbuild_version(repo_dir: str) -> Optional[str]:
|
||||
"""
|
||||
Read the version from PKGBUILD in repo_dir, if present.
|
||||
Read the version from PKGBUILD in repo_dir.
|
||||
|
||||
Expects:
|
||||
pkgver=1.2.3
|
||||
pkgrel=1
|
||||
|
||||
Returns either "1.2.3-1" (if both are present) or just "1.2.3".
|
||||
Combines pkgver and pkgrel if both exist:
|
||||
pkgver=1.2.3
|
||||
pkgrel=1
|
||||
-> 1.2.3-1
|
||||
"""
|
||||
path = os.path.join(repo_dir, "PKGBUILD")
|
||||
if not os.path.exists(path):
|
||||
if not os.path.isfile(path):
|
||||
return None
|
||||
|
||||
try:
|
||||
@@ -121,15 +117,13 @@ def read_pkgbuild_version(repo_dir: str) -> Optional[str]:
|
||||
|
||||
def read_debian_changelog_version(repo_dir: str) -> Optional[str]:
|
||||
"""
|
||||
Read the latest Debian version from debian/changelog in repo_dir, if present.
|
||||
Read the latest version from debian/changelog.
|
||||
|
||||
The first non-empty line typically looks like:
|
||||
package-name (1.2.3-1) unstable; urgency=medium
|
||||
|
||||
We extract the text inside the first parentheses.
|
||||
Expected format:
|
||||
package (1.2.3-1) unstable; urgency=medium
|
||||
"""
|
||||
path = os.path.join(repo_dir, "debian", "changelog")
|
||||
if not os.path.exists(path):
|
||||
if not os.path.isfile(path):
|
||||
return None
|
||||
|
||||
try:
|
||||
@@ -140,8 +134,7 @@ def read_debian_changelog_version(repo_dir: str) -> Optional[str]:
|
||||
continue
|
||||
match = re.search(r"\(([^)]+)\)", line)
|
||||
if match:
|
||||
version = match.group(1).strip()
|
||||
return version or None
|
||||
return match.group(1).strip() or None
|
||||
break
|
||||
except Exception:
|
||||
return None
|
||||
@@ -151,81 +144,72 @@ def read_debian_changelog_version(repo_dir: str) -> Optional[str]:
|
||||
|
||||
def read_spec_version(repo_dir: str) -> Optional[str]:
|
||||
"""
|
||||
Read the version from a RPM spec file.
|
||||
Read the version from an RPM spec file.
|
||||
|
||||
For now, we assume a fixed file name 'package-manager.spec'
|
||||
in repo_dir with lines like:
|
||||
|
||||
Version: 1.2.3
|
||||
Release: 1%{?dist}
|
||||
|
||||
Returns either "1.2.3-1" (if Release is present) or "1.2.3".
|
||||
Any RPM macro suffix like '%{?dist}' is stripped from the release.
|
||||
Combines:
|
||||
Version: 1.2.3
|
||||
Release: 1%{?dist}
|
||||
-> 1.2.3-1
|
||||
"""
|
||||
path = os.path.join(repo_dir, "package-manager.spec")
|
||||
if not os.path.exists(path):
|
||||
return None
|
||||
for fn in os.listdir(repo_dir):
|
||||
if not fn.endswith(".spec"):
|
||||
continue
|
||||
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
text = f.read()
|
||||
except Exception:
|
||||
return None
|
||||
path = os.path.join(repo_dir, fn)
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
text = f.read()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
ver_match = re.search(r"^Version:\s*(.+)$", text, re.MULTILINE)
|
||||
if not ver_match:
|
||||
return None
|
||||
version = ver_match.group(1).strip()
|
||||
ver_match = re.search(r"^Version:\s*(.+)$", text, re.MULTILINE)
|
||||
if not ver_match:
|
||||
return None
|
||||
version = ver_match.group(1).strip()
|
||||
|
||||
rel_match = re.search(r"^Release:\s*(.+)$", text, re.MULTILINE)
|
||||
if rel_match:
|
||||
release_raw = rel_match.group(1).strip()
|
||||
# Strip common RPM macro suffix like %... (e.g. 1%{?dist})
|
||||
release = release_raw.split("%", 1)[0].strip()
|
||||
# Also strip anything after first whitespace, just in case
|
||||
release = release.split(" ", 1)[0].strip()
|
||||
if release:
|
||||
return f"{version}-{release}"
|
||||
rel_match = re.search(r"^Release:\s*(.+)$", text, re.MULTILINE)
|
||||
if rel_match:
|
||||
release_raw = rel_match.group(1).strip()
|
||||
release = release_raw.split("%", 1)[0].split(" ", 1)[0].strip()
|
||||
if release:
|
||||
return f"{version}-{release}"
|
||||
|
||||
return version or None
|
||||
return version or None
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def read_ansible_galaxy_version(repo_dir: str) -> Optional[str]:
|
||||
"""
|
||||
Read the version from Ansible Galaxy metadata, if present.
|
||||
Read the version from Ansible Galaxy metadata.
|
||||
|
||||
Supported locations:
|
||||
- galaxy.yml (preferred for modern roles/collections)
|
||||
- meta/main.yml (legacy style roles; uses galaxy_info.version or version)
|
||||
Supported:
|
||||
- galaxy.yml
|
||||
- meta/main.yml (galaxy_info.version or version)
|
||||
"""
|
||||
# 1) galaxy.yml in repo root
|
||||
galaxy_path = os.path.join(repo_dir, "galaxy.yml")
|
||||
if os.path.exists(galaxy_path):
|
||||
galaxy_yml = os.path.join(repo_dir, "galaxy.yml")
|
||||
if os.path.isfile(galaxy_yml):
|
||||
try:
|
||||
with open(galaxy_path, "r", encoding="utf-8") as f:
|
||||
with open(galaxy_yml, "r", encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f) or {}
|
||||
version = data.get("version")
|
||||
if isinstance(version, str) and version.strip():
|
||||
return version.strip()
|
||||
except Exception:
|
||||
# Ignore parse errors and fall through to meta/main.yml
|
||||
pass
|
||||
|
||||
# 2) meta/main.yml (classic Ansible role)
|
||||
meta_path = os.path.join(repo_dir, "meta", "main.yml")
|
||||
if os.path.exists(meta_path):
|
||||
meta_yml = os.path.join(repo_dir, "meta", "main.yml")
|
||||
if os.path.isfile(meta_yml):
|
||||
try:
|
||||
with open(meta_path, "r", encoding="utf-8") as f:
|
||||
with open(meta_yml, "r", encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f) or {}
|
||||
|
||||
# Preferred: galaxy_info.version
|
||||
galaxy_info = data.get("galaxy_info") or {}
|
||||
if isinstance(galaxy_info, dict):
|
||||
version = galaxy_info.get("version")
|
||||
if isinstance(version, str) and version.strip():
|
||||
return version.strip()
|
||||
|
||||
# Fallback: top-level 'version'
|
||||
version = data.get("version")
|
||||
if isinstance(version, str) and version.strip():
|
||||
return version.strip()
|
||||
|
||||
@@ -23,7 +23,7 @@ from unittest.mock import patch
|
||||
import pkgmgr.actions.install as install_mod
|
||||
from pkgmgr.actions.install import install_repos
|
||||
from pkgmgr.actions.install.installers.makefile import MakefileInstaller
|
||||
from pkgmgr.actions.install.installers.nix_flake import NixFlakeInstaller
|
||||
from pkgmgr.actions.install.installers.nix import NixFlakeInstaller
|
||||
from pkgmgr.actions.install.installers.os_packages.arch_pkgbuild import (
|
||||
ArchPkgbuildInstaller,
|
||||
)
|
||||
|
||||
@@ -20,9 +20,10 @@ import subprocess
|
||||
import tempfile
|
||||
import unittest
|
||||
from contextlib import redirect_stdout
|
||||
from typing import List
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.actions.install.installers.nix_flake import NixFlakeInstaller
|
||||
from pkgmgr.actions.install.installers.nix import NixFlakeInstaller
|
||||
|
||||
|
||||
class DummyCtx:
|
||||
@@ -60,42 +61,51 @@ class TestNixFlakeInstaller(unittest.TestCase):
|
||||
shutil.rmtree(self._tmpdir, ignore_errors=True)
|
||||
|
||||
@staticmethod
|
||||
def _cp(code: int) -> subprocess.CompletedProcess:
|
||||
# stdout/stderr are irrelevant here, but keep shape realistic
|
||||
return subprocess.CompletedProcess(args=["nix"], returncode=code, stdout="", stderr="")
|
||||
def _cp(code: int, stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
|
||||
return subprocess.CompletedProcess(args=["nix"], returncode=code, stdout=stdout, stderr=stderr)
|
||||
|
||||
@staticmethod
|
||||
def _enable_nix_in_module(which_patch) -> None:
|
||||
"""Ensure shutil.which('nix') in nix_flake module returns a path."""
|
||||
"""Ensure shutil.which('nix') in nix installer module returns a path."""
|
||||
which_patch.return_value = "/usr/bin/nix"
|
||||
|
||||
@staticmethod
|
||||
def _install_cmds_from_calls(call_args_list) -> List[str]:
|
||||
cmds: List[str] = []
|
||||
for c in call_args_list:
|
||||
if not c.args:
|
||||
continue
|
||||
cmd = c.args[0]
|
||||
if isinstance(cmd, str) and cmd.startswith("nix profile install "):
|
||||
cmds.append(cmd)
|
||||
return cmds
|
||||
|
||||
def test_nix_flake_run_success(self) -> None:
|
||||
"""
|
||||
When run_command returns success (returncode 0), installer
|
||||
When install returns success (returncode 0), installer
|
||||
should report success and not raise.
|
||||
"""
|
||||
ctx = DummyCtx(identifier="some-lib", repo_dir=self.repo_dir)
|
||||
installer = NixFlakeInstaller()
|
||||
|
||||
install_results = [self._cp(0)] # first install succeeds
|
||||
|
||||
def fake_subprocess_run(cmd, *args, **kwargs):
|
||||
# cmd is a string because CommandRunner uses shell=True
|
||||
if isinstance(cmd, str) and cmd.startswith("nix profile list --json"):
|
||||
return self._cp(0, stdout='{"elements": []}', stderr="")
|
||||
if isinstance(cmd, str) and cmd.startswith("nix profile install "):
|
||||
return install_results.pop(0)
|
||||
return self._cp(0)
|
||||
|
||||
buf = io.StringIO()
|
||||
with patch("pkgmgr.actions.install.installers.nix_flake.shutil.which") as which_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix_flake.subprocess.run"
|
||||
) as subproc_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix_flake.run_command"
|
||||
) as run_cmd_mock, redirect_stdout(buf):
|
||||
with patch("pkgmgr.actions.install.installers.nix.installer.shutil.which") as which_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix.installer.os.path.exists", return_value=True
|
||||
), patch(
|
||||
"pkgmgr.actions.install.installers.nix.runner.subprocess.run", side_effect=fake_subprocess_run
|
||||
) as subproc_mock, redirect_stdout(buf):
|
||||
self._enable_nix_in_module(which_mock)
|
||||
|
||||
# For profile list JSON (used only on failure paths, but keep deterministic)
|
||||
subproc_mock.return_value = subprocess.CompletedProcess(
|
||||
args=["nix", "profile", "list", "--json"],
|
||||
returncode=0,
|
||||
stdout='{"elements": []}',
|
||||
stderr="",
|
||||
)
|
||||
|
||||
# Install succeeds
|
||||
run_cmd_mock.return_value = self._cp(0)
|
||||
|
||||
self.assertTrue(installer.supports(ctx))
|
||||
installer.run(ctx)
|
||||
|
||||
@@ -103,12 +113,8 @@ class TestNixFlakeInstaller(unittest.TestCase):
|
||||
self.assertIn("[nix] install: nix profile install", out)
|
||||
self.assertIn("[nix] output 'default' successfully installed.", out)
|
||||
|
||||
run_cmd_mock.assert_called_with(
|
||||
f"nix profile install {self.repo_dir}#default",
|
||||
cwd=self.repo_dir,
|
||||
preview=False,
|
||||
allow_failure=True,
|
||||
)
|
||||
install_cmds = self._install_cmds_from_calls(subproc_mock.call_args_list)
|
||||
self.assertEqual(install_cmds, [f"nix profile install {self.repo_dir}#default"])
|
||||
|
||||
def test_nix_flake_run_mandatory_failure_raises(self) -> None:
|
||||
"""
|
||||
@@ -118,34 +124,43 @@ class TestNixFlakeInstaller(unittest.TestCase):
|
||||
ctx = DummyCtx(identifier="some-lib", repo_dir=self.repo_dir)
|
||||
installer = NixFlakeInstaller()
|
||||
|
||||
# retry layer does one attempt (non-403), then fallback does final attempt => 2 installs
|
||||
install_results = [self._cp(1), self._cp(1)]
|
||||
|
||||
def fake_subprocess_run(cmd, *args, **kwargs):
|
||||
if isinstance(cmd, str) and cmd.startswith("nix profile list --json"):
|
||||
return self._cp(0, stdout='{"elements": []}', stderr="")
|
||||
if isinstance(cmd, str) and cmd.startswith("nix profile install "):
|
||||
return install_results.pop(0)
|
||||
return self._cp(0)
|
||||
|
||||
buf = io.StringIO()
|
||||
with patch("pkgmgr.actions.install.installers.nix_flake.shutil.which") as which_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix_flake.subprocess.run"
|
||||
) as subproc_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix_flake.run_command"
|
||||
) as run_cmd_mock, redirect_stdout(buf):
|
||||
with patch("pkgmgr.actions.install.installers.nix.installer.shutil.which") as which_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix.installer.os.path.exists", return_value=True
|
||||
), patch(
|
||||
"pkgmgr.actions.install.installers.nix.runner.subprocess.run", side_effect=fake_subprocess_run
|
||||
) as subproc_mock, redirect_stdout(buf):
|
||||
self._enable_nix_in_module(which_mock)
|
||||
|
||||
# No indices available (empty list)
|
||||
subproc_mock.return_value = subprocess.CompletedProcess(
|
||||
args=["nix", "profile", "list", "--json"],
|
||||
returncode=0,
|
||||
stdout='{"elements": []}',
|
||||
stderr="",
|
||||
)
|
||||
|
||||
# First install fails, retry fails -> should raise SystemExit(1)
|
||||
run_cmd_mock.side_effect = [self._cp(1), self._cp(1)]
|
||||
|
||||
self.assertTrue(installer.supports(ctx))
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
installer.run(ctx)
|
||||
|
||||
self.assertEqual(cm.exception.code, 1)
|
||||
|
||||
out = buf.getvalue()
|
||||
self.assertIn("[nix] install: nix profile install", out)
|
||||
self.assertIn("[ERROR] Failed to install Nix flake output 'default' (exit 1)", out)
|
||||
|
||||
install_cmds = self._install_cmds_from_calls(subproc_mock.call_args_list)
|
||||
self.assertEqual(
|
||||
install_cmds,
|
||||
[
|
||||
f"nix profile install {self.repo_dir}#default",
|
||||
f"nix profile install {self.repo_dir}#default",
|
||||
],
|
||||
)
|
||||
|
||||
def test_nix_flake_run_optional_failure_does_not_raise(self) -> None:
|
||||
"""
|
||||
For pkgmgr/package-manager repositories:
|
||||
@@ -156,29 +171,26 @@ class TestNixFlakeInstaller(unittest.TestCase):
|
||||
ctx = DummyCtx(identifier="pkgmgr", repo_dir=self.repo_dir)
|
||||
installer = NixFlakeInstaller()
|
||||
|
||||
# pkgmgr success (1 call), default fails (2 calls: attempt + final)
|
||||
install_results = [self._cp(0), self._cp(1), self._cp(1)]
|
||||
|
||||
def fake_subprocess_run(cmd, *args, **kwargs):
|
||||
if isinstance(cmd, str) and cmd.startswith("nix profile list --json"):
|
||||
return self._cp(0, stdout='{"elements": []}', stderr="")
|
||||
if isinstance(cmd, str) and cmd.startswith("nix profile install "):
|
||||
return install_results.pop(0)
|
||||
return self._cp(0)
|
||||
|
||||
buf = io.StringIO()
|
||||
with patch("pkgmgr.actions.install.installers.nix_flake.shutil.which") as which_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix_flake.subprocess.run"
|
||||
) as subproc_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix_flake.run_command"
|
||||
) as run_cmd_mock, redirect_stdout(buf):
|
||||
with patch("pkgmgr.actions.install.installers.nix.installer.shutil.which") as which_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix.installer.os.path.exists", return_value=True
|
||||
), patch(
|
||||
"pkgmgr.actions.install.installers.nix.runner.subprocess.run", side_effect=fake_subprocess_run
|
||||
) as subproc_mock, redirect_stdout(buf):
|
||||
self._enable_nix_in_module(which_mock)
|
||||
|
||||
# No indices available (empty list)
|
||||
subproc_mock.return_value = subprocess.CompletedProcess(
|
||||
args=["nix", "profile", "list", "--json"],
|
||||
returncode=0,
|
||||
stdout='{"elements": []}',
|
||||
stderr="",
|
||||
)
|
||||
|
||||
# pkgmgr install ok; default fails twice (initial + retry)
|
||||
run_cmd_mock.side_effect = [self._cp(0), self._cp(1), self._cp(1)]
|
||||
|
||||
self.assertTrue(installer.supports(ctx))
|
||||
|
||||
# Must NOT raise despite optional failure
|
||||
installer.run(ctx)
|
||||
installer.run(ctx) # must NOT raise
|
||||
|
||||
out = buf.getvalue()
|
||||
|
||||
@@ -192,14 +204,15 @@ class TestNixFlakeInstaller(unittest.TestCase):
|
||||
self.assertIn("[ERROR] Failed to install Nix flake output 'default' (exit 1)", out)
|
||||
self.assertIn("[WARNING] Continuing despite failure of optional output 'default'.", out)
|
||||
|
||||
# Verify run_command was called for both outputs (default twice due to retry)
|
||||
expected_calls = [
|
||||
(f"nix profile install {self.repo_dir}#pkgmgr",),
|
||||
(f"nix profile install {self.repo_dir}#default",),
|
||||
(f"nix profile install {self.repo_dir}#default",),
|
||||
]
|
||||
actual_cmds = [c.args[0] for c in run_cmd_mock.call_args_list]
|
||||
self.assertEqual(actual_cmds, [e[0] for e in expected_calls])
|
||||
install_cmds = self._install_cmds_from_calls(subproc_mock.call_args_list)
|
||||
self.assertEqual(
|
||||
install_cmds,
|
||||
[
|
||||
f"nix profile install {self.repo_dir}#pkgmgr",
|
||||
f"nix profile install {self.repo_dir}#default",
|
||||
f"nix profile install {self.repo_dir}#default",
|
||||
],
|
||||
)
|
||||
|
||||
def test_nix_flake_supports_respects_disable_env(self) -> None:
|
||||
"""
|
||||
@@ -209,7 +222,9 @@ class TestNixFlakeInstaller(unittest.TestCase):
|
||||
ctx = DummyCtx(identifier="pkgmgr", repo_dir=self.repo_dir, quiet=False)
|
||||
installer = NixFlakeInstaller()
|
||||
|
||||
with patch("pkgmgr.actions.install.installers.nix_flake.shutil.which") as which_mock:
|
||||
with patch("pkgmgr.actions.install.installers.nix.installer.shutil.which") as which_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix.installer.os.path.exists", return_value=True
|
||||
):
|
||||
self._enable_nix_in_module(which_mock)
|
||||
os.environ["PKGMGR_DISABLE_NIX_FLAKE_INSTALLER"] = "1"
|
||||
self.assertFalse(installer.supports(ctx))
|
||||
@@ -0,0 +1,106 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.actions.install.installers.nix.retry import GitHubRateLimitRetry, RetryPolicy
|
||||
from pkgmgr.actions.install.installers.nix.types import RunResult
|
||||
|
||||
|
||||
class DummyCtx:
|
||||
def __init__(self, quiet: bool = True) -> None:
|
||||
self.quiet = quiet
|
||||
|
||||
|
||||
class FakeRunner:
|
||||
"""
|
||||
Simulates a runner that returns:
|
||||
- HTTP 403 for the first N calls
|
||||
- success afterwards
|
||||
"""
|
||||
|
||||
def __init__(self, fail_count: int) -> None:
|
||||
self.fail_count = fail_count
|
||||
self.calls = 0
|
||||
|
||||
def run(self, ctx: DummyCtx, cmd: str, allow_failure: bool) -> RunResult:
|
||||
self.calls += 1
|
||||
|
||||
if self.calls <= self.fail_count:
|
||||
return RunResult(
|
||||
returncode=1,
|
||||
stdout="",
|
||||
stderr="error: HTTP error 403: rate limit exceeded (simulated)",
|
||||
)
|
||||
|
||||
return RunResult(returncode=0, stdout="ok", stderr="")
|
||||
|
||||
|
||||
class TestGitHub403Retry(unittest.TestCase):
|
||||
def test_retries_on_403_without_realtime_waiting(self) -> None:
|
||||
"""
|
||||
Ensure:
|
||||
- It retries only on GitHub 403-like errors
|
||||
- It does not actually sleep in realtime (time.sleep patched)
|
||||
- It stops once a success occurs
|
||||
- Wait times follow Fibonacci(base=30) + jitter
|
||||
"""
|
||||
policy = RetryPolicy(
|
||||
max_attempts=3, # attempts: 1,2,3
|
||||
base_delay_seconds=30, # fibonacci delays: 30, 30, 60
|
||||
jitter_seconds_min=0,
|
||||
jitter_seconds_max=60,
|
||||
)
|
||||
|
||||
retry = GitHubRateLimitRetry(policy=policy)
|
||||
ctx = DummyCtx(quiet=True)
|
||||
runner = FakeRunner(fail_count=2) # fail twice (403), then succeed
|
||||
|
||||
# Make jitter deterministic and prevent real sleeping.
|
||||
with patch("pkgmgr.actions.install.installers.nix.retry.random.randint", return_value=5) as jitter_mock, patch(
|
||||
"pkgmgr.actions.install.installers.nix.retry.time.sleep"
|
||||
) as sleep_mock:
|
||||
res = retry.run_with_retry(ctx, runner, "nix profile install /tmp#default")
|
||||
|
||||
# Result should be success on 3rd attempt.
|
||||
self.assertEqual(res.returncode, 0)
|
||||
self.assertEqual(runner.calls, 3)
|
||||
|
||||
# jitter should be used for each retry sleep (attempt 1->2, attempt 2->3) => 2 sleeps
|
||||
self.assertEqual(jitter_mock.call_count, 2)
|
||||
self.assertEqual(sleep_mock.call_count, 2)
|
||||
|
||||
# Fibonacci delays for attempts=3: [30, 30, 60]
|
||||
# sleep occurs after failed attempt 1 and 2, so base delays are 30 and 30
|
||||
# wait_time = base_delay + jitter(5) => 35, 35
|
||||
sleep_args = [c.args[0] for c in sleep_mock.call_args_list]
|
||||
self.assertEqual(sleep_args, [35, 35])
|
||||
|
||||
def test_does_not_retry_on_non_403_errors(self) -> None:
|
||||
"""
|
||||
Ensure it does not retry when the error is not recognized as GitHub 403/rate limit.
|
||||
"""
|
||||
policy = RetryPolicy(max_attempts=7, base_delay_seconds=30)
|
||||
retry = GitHubRateLimitRetry(policy=policy)
|
||||
ctx = DummyCtx(quiet=True)
|
||||
|
||||
class Non403Runner:
|
||||
def __init__(self) -> None:
|
||||
self.calls = 0
|
||||
|
||||
def run(self, ctx: DummyCtx, cmd: str, allow_failure: bool) -> RunResult:
|
||||
self.calls += 1
|
||||
return RunResult(returncode=1, stdout="", stderr="some other error (simulated)")
|
||||
|
||||
runner = Non403Runner()
|
||||
|
||||
with patch("pkgmgr.actions.install.installers.nix.retry.time.sleep") as sleep_mock:
|
||||
res = retry.run_with_retry(ctx, runner, "nix profile install /tmp#default")
|
||||
|
||||
self.assertEqual(res.returncode, 1)
|
||||
self.assertEqual(runner.calls, 1) # no retries
|
||||
self.assertEqual(sleep_mock.call_count, 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user