Compare commits

...

14 Commits

Author SHA1 Message Date
Kevin Veen-Birkenbach
d1e5a71f77 Merge branch 'feature/mirror-provision'
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-14 10:45:51 +01:00
Kevin Veen-Birkenbach
d59dc8ad53 fix(cli): route update exclusively through UpdateManager
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
* Remove `update` from repos command dispatch
* Prevent update from being handled by `handle_repos_command`
* Ensure top-level `update` always uses UpdateManager
* Fix "Unknown repos command: update" error after refactor

https://chatgpt.com/share/693e7ee9-2658-800f-985f-293ed0c8efbc
2025-12-14 10:09:46 +01:00
Kevin Veen-Birkenbach
55f4a1e941 refactor(update): move update logic to unified UpdateManager and extend system support
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
- Move update orchestration from repository scope to actions/update
- Introduce UpdateManager and SystemUpdater with distro detection
- Add Arch, Debian/Ubuntu, and Fedora/RHEL system update handling
- Rename CLI flag from --system-update to --system
- Route update as a top-level command in CLI dispatch
- Remove legacy update_repos implementation
- Add E2E tests for:
  - update all without system updates
  - update single repo (pkgmgr) with system updates

https://chatgpt.com/share/693e76ec-5ee4-800f-9623-3983f56d5430
2025-12-14 09:35:52 +01:00
Kevin Veen-Birkenbach
2a4ec18532 Changed argument order
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
2025-12-14 08:51:37 +01:00
Kevin Veen-Birkenbach
2debdbee09 * **Split mirror responsibilities into clear subcommands**
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
Setup configures local Git state, check validates remote reachability in a read-only way, and provision explicitly creates missing remote repositories. Destructive behavior is never implicit.

* **Introduce a remote provisioning layer**
  pkgmgr can now ensure that repositories exist on remote providers. If a repository is missing, it can be created automatically on supported platforms when explicitly requested.

* **Add a provider registry for extensibility**
  Providers are resolved based on the remote host, with optional hints to force a specific backend. This makes it straightforward to add further providers later without changing the core logic.

* **Use a lightweight, dependency-free HTTP client**
  All API communication is handled via a small stdlib-based client. HTTP errors are mapped to meaningful domain errors, improving diagnostics and error handling consistency.

* **Centralize credential resolution**
  API tokens are resolved in a strict order: environment variables first, then the system keyring, and finally an interactive prompt if allowed. This works well for both CI and interactive use.

* **Keep keyring integration optional**
  Secure token storage via the OS keyring is provided as an optional dependency. If unavailable, pkgmgr still works using environment variables or one-off interactive tokens.

* **Improve CLI parser safety and clarity**
  Shared argument helpers now guard against duplicate definitions, making composed subcommands more robust and easier to maintain.

* **Expand end-to-end test coverage**
  All mirror-related workflows are exercised through real CLI invocations in preview mode, ensuring full wiring correctness while remaining safe for automated test environments.

https://chatgpt.com/share/693df441-a780-800f-bcf7-96e06cc9e421
2025-12-14 00:16:54 +01:00
Kevin Veen-Birkenbach
4cb62e90f8 refactor: move nix experimental feature setup to nix.conf and rename pkgmgr wrapper
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
https://chatgpt.com/share/693dcbad-3d30-800f-acfe-22f7263f3e80
2025-12-13 21:25:02 +01:00
Kevin Veen-Birkenbach
923519497a Updated Homepage
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-13 20:41:06 +01:00
Kevin Veen-Birkenbach
5fa18cb449 Merge branch 'fix/self-install'
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-13 20:09:17 +01:00
Kevin Veen-Birkenbach
f513196911 Used correct tabulation
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
2025-12-13 20:08:30 +01:00
Kevin Veen-Birkenbach
7f06447bbd feat(cli): add --system-update flag to update command
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
- Register --system-update for `pkgmgr update`
- Expose args.system_update for update workflow
- Align CLI with update_repos and E2E tests

https://chatgpt.com/share/693db645-c420-800f-b921-9d5c0356d0ac
2025-12-13 20:02:48 +01:00
Kevin Veen-Birkenbach
1e5d6d3eee test(unit): update NixFlakeInstaller tests for new run_command-based logic
- Adapt DummyCtx to include quiet and force_update flags
- Replace os.system mocking with run_command/subprocess mocks
- Align assertions with new Nix install/upgrade output
- Keep coverage for mandatory vs optional output handling

https://chatgpt.com/share/693db645-c420-800f-b921-9d5c0356d0ac
2025-12-13 19:53:34 +01:00
Kevin Veen-Birkenbach
f2970adbb2 test(e2e): enforce --system-update and isolate update-all integration tests
- Require --system-update for update-all integration tests
- Run tests with isolated HOME and temporary gitconfig
- Allow /src as git safe.directory for nix run
- Capture and print combined stdout/stderr on failure
- Ensure consistent environment for pkgmgr and nix-run executions
2025-12-13 19:49:40 +01:00
Kevin Veen-Birkenbach
7f262c6557 feat(install): add --update to re-run active-layer installers and improve Nix refresh logic
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
* Add `force_update` to `RepoContext` and propagate it through install/update flows
* Add `pkgmgr install --update` to force re-running installers even if the same CLI layer is already loaded
* Enhance `NixFlakeInstaller` to ensure correct outputs (pkgmgr + optional default for package-manager) and support refresh/upgrade with index-based fallback remove+reinstall
* Make Python/Makefile installers emit an “upgraded” marker when `force_update` is used
* Add E2E tests for “three times install” scenarios (makefile, nix, venv) with shared run helper
* Fix git safe.directory wildcard quoting in E2E shell runner and minor cleanup/reordering of imports/comments

https://chatgpt.com/share/693db0b4-6ea4-800f-b44a-f03939c7fb9e
2025-12-13 19:30:06 +01:00
Kevin Veen-Birkenbach
0bc7a3ecc0 ci(nix): retry flake evaluation on GitHub API rate limits
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
Add a reusable retry helper that detects GitHub API 403 rate-limit errors
during Nix flake evaluation and retries with exponential backoff.

Apply the retry logic to flake-only CI tests so transient GitHub rate
limits no longer cause random CI failures while preserving fast failure
for real errors.

https://chatgpt.com/share/693d7ec5-ac70-800f-a627-ef705c653ba1
2025-12-13 15:57:05 +01:00
60 changed files with 2334 additions and 963 deletions

View File

@@ -46,8 +46,6 @@ jobs:
. "$HOME/.venvs/pkgmgr/bin/activate"
export NIX_CONFIG="experimental-features = nix-command flakes"
pkgmgr update pkgmgr --clone-mode shallow --no-verification
pkgmgr version pkgmgr

View File

@@ -59,7 +59,6 @@ jobs:
pkgmgr version pkgmgr
export NIX_REMOTE=local
export NIX_CONFIG=\"experimental-features = nix-command flakes\"
nix run /src#pkgmgr -- version pkgmgr
"
'

View File

@@ -36,9 +36,6 @@ CMD ["bash"]
# ============================================================
FROM virgin AS full
# Nix environment defaults (only config; nix itself comes from deps/install flow)
ENV NIX_CONFIG="experimental-features = nix-command flakes"
WORKDIR /build
# Copy full repository for build

View File

@@ -47,7 +47,7 @@ package() {
cd "$srcdir/$_srcdir_name"
# Install the wrapper into /usr/bin
install -Dm0755 "scripts/pkgmgr-wrapper.sh" \
install -Dm0755 "scripts/launcher.sh" \
"$pkgdir/usr/bin/pkgmgr"
# Install Nix bootstrap (init + lib)

View File

@@ -28,7 +28,7 @@ override_dh_auto_install:
install -d debian/package-manager/usr/lib/package-manager
# Install wrapper
install -m0755 scripts/pkgmgr-wrapper.sh \
install -m0755 scripts/launcher.sh \
debian/package-manager/usr/bin/pkgmgr
# Install Nix bootstrap (init + lib)

View File

@@ -42,7 +42,7 @@ install -d %{buildroot}/usr/lib/package-manager
cp -a . %{buildroot}/usr/lib/package-manager/
# Wrapper
install -m0755 scripts/pkgmgr-wrapper.sh %{buildroot}%{_bindir}/pkgmgr
install -m0755 scripts/launcher.sh %{buildroot}%{_bindir}/pkgmgr
# Nix bootstrap (init + lib)
install -d %{buildroot}/usr/lib/package-manager/nix

View File

@@ -23,12 +23,12 @@ dependencies = [
]
[project.urls]
Homepage = "https://github.com/kevinveenbirkenbach/package-manager"
Homepage = "https://s.veen.world/pkgmgr"
Source = "https://github.com/kevinveenbirkenbach/package-manager"
[project.optional-dependencies]
keyring = ["keyring>=24.0.0"]
dev = [
"pytest",
"mypy"
]

View File

@@ -1,11 +1,6 @@
#!/usr/bin/env bash
set -euo pipefail
# Ensure NIX_CONFIG has our defaults if not already set
if [[ -z "${NIX_CONFIG:-}" ]]; then
export NIX_CONFIG="experimental-features = nix-command flakes"
fi
FLAKE_DIR="/usr/lib/package-manager"
# ---------------------------------------------------------------------------
@@ -43,6 +38,6 @@ if command -v nix >/dev/null 2>&1; then
exec nix run "${FLAKE_DIR}#pkgmgr" -- "$@"
fi
echo "[pkgmgr-wrapper] ERROR: 'nix' binary not found on PATH after init."
echo "[pkgmgr-wrapper] Nix is required to run pkgmgr (no Python fallback)."
echo "[launcher] ERROR: 'nix' binary not found on PATH after init."
echo "[launcher] Nix is required to run pkgmgr (no Python fallback)."
exit 1

View File

@@ -11,45 +11,79 @@ nixconf_file_path() {
echo "/etc/nix/nix.conf"
}
nixconf_ensure_experimental_features() {
local nix_conf want
nix_conf="$(nixconf_file_path)"
want="experimental-features = nix-command flakes"
# Ensure a given nix.conf key contains required tokens (merged, no duplicates)
nixconf_ensure_features_key() {
local nix_conf="$1"
local key="$2"
shift 2
local required=("$@")
mkdir -p /etc/nix
# Create file if missing (with just the required tokens)
if [[ ! -f "${nix_conf}" ]]; then
local want="${key} = ${required[*]}"
echo "[nix-conf] Creating ${nix_conf} with: ${want}"
printf "%s\n" "${want}" >"${nix_conf}"
return 0
fi
if grep -qE '^\s*experimental-features\s*=' "${nix_conf}"; then
if grep -qE '^\s*experimental-features\s*=.*\bnix-command\b' "${nix_conf}" \
&& grep -qE '^\s*experimental-features\s*=.*\bflakes\b' "${nix_conf}"; then
echo "[nix-conf] experimental-features already correct"
# Key exists -> merge tokens
if grep -qE "^\s*${key}\s*=" "${nix_conf}"; then
local ok=1
local t
for t in "${required[@]}"; do
if ! grep -qE "^\s*${key}\s*=.*\b${t}\b" "${nix_conf}"; then
ok=0
break
fi
done
if [[ "$ok" -eq 1 ]]; then
echo "[nix-conf] ${key} already correct"
return 0
fi
echo "[nix-conf] Extending experimental-features in ${nix_conf}"
echo "[nix-conf] Extending ${key} in ${nix_conf}"
local current
current="$(grep -E '^\s*experimental-features\s*=' "${nix_conf}" | head -n1 | cut -d= -f2-)"
current="$(grep -E "^\s*${key}\s*=" "${nix_conf}" | head -n1 | cut -d= -f2-)"
current="$(echo "${current}" | xargs)" # trim
# Build a merged feature string without duplicates (simple token set)
local merged="nix-command flakes"
local merged=""
local token
# Start with existing tokens
for token in ${current}; do
if [[ " ${merged} " != *" ${token} "* ]]; then
merged="${merged} ${token}"
fi
done
sed -i "s|^\s*experimental-features\s*=.*|experimental-features = ${merged}|" "${nix_conf}"
# Add required tokens
for token in "${required[@]}"; do
if [[ " ${merged} " != *" ${token} "* ]]; then
merged="${merged} ${token}"
fi
done
merged="$(echo "${merged}" | xargs)" # trim
sed -i "s|^\s*${key}\s*=.*|${key} = ${merged}|" "${nix_conf}"
return 0
fi
# Key missing -> append
local want="${key} = ${required[*]}"
echo "[nix-conf] Appending to ${nix_conf}: ${want}"
printf "\n%s\n" "${want}" >>"${nix_conf}"
}
nixconf_ensure_experimental_features() {
local nix_conf
nix_conf="$(nixconf_file_path)"
# Ensure both keys to avoid prompts and cover older/alternate expectations
nixconf_ensure_features_key "${nix_conf}" "experimental-features" "nix-command" "flakes"
nixconf_ensure_features_key "${nix_conf}" "extra-experimental-features" "nix-command" "flakes"
}

52
scripts/nix/lib/retry_403.sh Executable file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env bash
set -euo pipefail
if [[ -n "${PKGMGR_NIX_RETRY_403_SH:-}" ]]; then
return 0
fi
PKGMGR_NIX_RETRY_403_SH=1
# Retry only when we see the GitHub API rate limit 403 error during nix flake evaluation.
# Retries 7 times with delays: 10, 30, 50, 80, 130, 210, 420 seconds.
run_with_github_403_retry() {
local -a delays=(10 30 50 80 130 210 420)
local attempt=0
local max_retries="${#delays[@]}"
while true; do
local err tmp
tmp="$(mktemp -t nix-err.XXXXXX)"
err=0
# Run the command; capture stderr for inspection while preserving stdout.
if "$@" 2>"$tmp"; then
rm -f "$tmp"
return 0
else
err=$?
fi
# Only retry on the specific GitHub API rate limit 403 case.
if grep -qE 'HTTP error 403' "$tmp" && grep -qiE 'API rate limit exceeded|api\.github\.com' "$tmp"; then
if (( attempt >= max_retries )); then
cat "$tmp" >&2
rm -f "$tmp"
return "$err"
fi
local sleep_s="${delays[$attempt]}"
attempt=$((attempt + 1))
echo "[nix-retry] GitHub API rate-limit (403). Retry ${attempt}/${max_retries} in ${sleep_s}s: $*" >&2
cat "$tmp" >&2
rm -f "$tmp"
sleep "$sleep_s"
continue
fi
# Not our retry case -> fail fast with original stderr.
cat "$tmp" >&2
rm -f "$tmp"
return "$err"
done
}

View File

@@ -49,7 +49,7 @@ docker run --rm \
# Gitdir path shown in the "dubious ownership" error
git config --global --add safe.directory /src/.git || true
# Ephemeral CI containers: allow all paths as a last resort
git config --global --add safe.directory '*' || true
git config --global --add safe.directory "*" || true
fi
# Run the E2E tests inside the Nix development shell

View File

@@ -27,7 +27,7 @@ docker run --rm \
echo ">>> preflight: nix must exist in image"
if ! command -v nix >/dev/null 2>&1; then
echo "NO_NIX"
echo "ERROR: nix not found in image '\'''"${IMAGE}"''\'' (PKGMGR_DISTRO='"${PKGMGR_DISTRO}"')"
echo "ERROR: nix not found in image '"${IMAGE}"' (PKGMGR_DISTRO='"${PKGMGR_DISTRO}"')"
echo "HINT: Ensure Nix is installed during image build for this distro."
exit 1
fi
@@ -35,14 +35,28 @@ docker run --rm \
echo ">>> nix version"
nix --version
# ------------------------------------------------------------
# Retry helper for GitHub API rate-limit (HTTP 403)
# ------------------------------------------------------------
if [[ -f /src/scripts/nix/lib/retry_403.sh ]]; then
# shellcheck source=./scripts/nix/lib/retry_403.sh
source /src/scripts/nix/lib/retry_403.sh
elif [[ -f ./scripts/nix/lib/retry_403.sh ]]; then
# shellcheck source=./scripts/nix/lib/retry_403.sh
source ./scripts/nix/lib/retry_403.sh
else
echo "ERROR: retry helper not found: scripts/nix/lib/retry_403.sh"
exit 1
fi
echo ">>> nix flake show"
nix flake show . --no-write-lock-file >/dev/null
run_with_github_403_retry nix flake show . --no-write-lock-file >/dev/null
echo ">>> nix build .#default"
nix build .#default --no-link --no-write-lock-file
run_with_github_403_retry nix build .#default --no-link --no-write-lock-file
echo ">>> nix run .#pkgmgr -- --help"
nix run .#pkgmgr -- --help --no-write-lock-file
run_with_github_403_retry nix run .#pkgmgr -- --help --no-write-lock-file
echo ">>> OK: Nix flake-only test succeeded."
'

View File

@@ -1,3 +1,4 @@
# src/pkgmgr/actions/install/__init__.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
@@ -36,10 +37,8 @@ from pkgmgr.actions.install.installers.makefile import (
)
from pkgmgr.actions.install.pipeline import InstallationPipeline
Repository = Dict[str, Any]
# All available installers, in the order they should be considered.
INSTALLERS = [
ArchPkgbuildInstaller(),
DebianControlInstaller(),
@@ -50,11 +49,6 @@ INSTALLERS = [
]
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _ensure_repo_dir(
repo: Repository,
repositories_base_dir: str,
@@ -137,6 +131,7 @@ def _create_context(
quiet: bool,
clone_mode: str,
update_dependencies: bool,
force_update: bool,
) -> RepoContext:
"""
Build a RepoContext instance for the given repository.
@@ -153,14 +148,10 @@ def _create_context(
quiet=quiet,
clone_mode=clone_mode,
update_dependencies=update_dependencies,
force_update=force_update,
)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def install_repos(
selected_repos: List[Repository],
repositories_base_dir: str,
@@ -171,10 +162,14 @@ def install_repos(
quiet: bool,
clone_mode: str,
update_dependencies: bool,
force_update: bool = False,
) -> None:
"""
Install one or more repositories according to the configured installers
and the CLI layer precedence rules.
If force_update=True, installers of the currently active layer are allowed
to run again (upgrade/refresh), even if that layer is already loaded.
"""
pipeline = InstallationPipeline(INSTALLERS)
@@ -213,6 +208,7 @@ def install_repos(
quiet=quiet,
clone_mode=clone_mode,
update_dependencies=update_dependencies,
force_update=force_update,
)
pipeline.run(ctx)

View File

@@ -1,3 +1,4 @@
# src/pkgmgr/actions/install/context.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
@@ -28,3 +29,6 @@ class RepoContext:
quiet: bool
clone_mode: str
update_dependencies: bool
# If True, allow re-running installers of the currently active layer.
force_update: bool = False

View File

@@ -1,3 +1,4 @@
# src/pkgmgr/actions/install/installers/makefile.py
from __future__ import annotations
import os
@@ -9,89 +10,45 @@ from pkgmgr.core.command.run import run_command
class MakefileInstaller(BaseInstaller):
"""
Generic installer that runs `make install` if a Makefile with an
install target is present.
Safety rules:
- If PKGMGR_DISABLE_MAKEFILE_INSTALLER=1 is set, this installer
is globally disabled.
- The higher-level InstallationPipeline ensures that Makefile
installation does not run if a stronger CLI layer already owns
the command (e.g. Nix or OS packages).
"""
layer = "makefile"
MAKEFILE_NAME = "Makefile"
def supports(self, ctx: RepoContext) -> bool:
"""
Return True if this repository has a Makefile and the installer
is not globally disabled.
"""
# Optional global kill switch.
if os.environ.get("PKGMGR_DISABLE_MAKEFILE_INSTALLER") == "1":
if not ctx.quiet:
print(
"[INFO] MakefileInstaller is disabled via "
"PKGMGR_DISABLE_MAKEFILE_INSTALLER."
)
print("[INFO] PKGMGR_DISABLE_MAKEFILE_INSTALLER=1 skipping MakefileInstaller.")
return False
makefile_path = os.path.join(ctx.repo_dir, self.MAKEFILE_NAME)
return os.path.exists(makefile_path)
def _has_install_target(self, makefile_path: str) -> bool:
"""
Heuristically check whether the Makefile defines an install target.
We look for:
- a plain 'install:' target, or
- any 'install-*:' style target.
"""
try:
with open(makefile_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
except OSError:
return False
# Simple heuristics: look for "install:" or targets starting with "install-"
if re.search(r"^install\s*:", content, flags=re.MULTILINE):
return True
if re.search(r"^install-[a-zA-Z0-9_-]*\s*:", content, flags=re.MULTILINE):
return True
return False
def run(self, ctx: RepoContext) -> None:
"""
Execute `make install` in the repository directory if an install
target exists.
"""
makefile_path = os.path.join(ctx.repo_dir, self.MAKEFILE_NAME)
if not os.path.exists(makefile_path):
if not ctx.quiet:
print(
f"[pkgmgr] Makefile '{makefile_path}' not found, "
"skipping MakefileInstaller."
)
return
if not self._has_install_target(makefile_path):
if not ctx.quiet:
print(
f"[pkgmgr] No 'install' target found in {makefile_path}."
)
print(f"[pkgmgr] No 'install' target found in {makefile_path}.")
return
if not ctx.quiet:
print(
f"[pkgmgr] Running 'make install' in {ctx.repo_dir} "
"(MakefileInstaller)"
)
print(f"[pkgmgr] Running make install for {ctx.identifier} (MakefileInstaller)")
cmd = "make install"
run_command(cmd, cwd=ctx.repo_dir, preview=ctx.preview)
run_command("make install", cwd=ctx.repo_dir, preview=ctx.preview)
if ctx.force_update and not ctx.quiet:
print(f"[makefile] repo '{ctx.identifier}' successfully upgraded.")

View File

@@ -1,32 +1,12 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Installer for Nix flakes.
If a repository contains flake.nix and the 'nix' command is available, this
installer will try to install profile outputs from the flake.
Behavior:
- If flake.nix is present and `nix` exists on PATH:
* First remove any existing `package-manager` profile entry (best-effort).
* Then install one or more flake outputs via `nix profile install`.
- For the package-manager repo:
* `pkgmgr` is mandatory (CLI), `default` is optional.
- For all other repos:
* `default` is mandatory.
Special handling:
- If PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 is set, the installer is
globally disabled (useful for CI or debugging).
The higher-level InstallationPipeline and CLI-layer model decide when this
installer is allowed to run, based on where the current CLI comes from
(e.g. Nix, OS packages, Python, Makefile).
"""
from __future__ import annotations
import json
import os
import shutil
import subprocess
from typing import TYPE_CHECKING, List, Tuple
from pkgmgr.actions.install.installers.base import BaseInstaller
@@ -34,132 +14,225 @@ from pkgmgr.core.command.run import run_command
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
from pkgmgr.actions.install import InstallContext
class NixFlakeInstaller(BaseInstaller):
"""Install Nix flake profiles for repositories that define flake.nix."""
# Logical layer name, used by capability matchers.
layer = "nix"
FLAKE_FILE = "flake.nix"
PROFILE_NAME = "package-manager"
def supports(self, ctx: "RepoContext") -> bool:
"""
Only support repositories that:
- Are NOT explicitly disabled via PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1,
- Have a flake.nix,
- And have the `nix` command available.
"""
# Optional global kill-switch for CI or debugging.
if os.environ.get("PKGMGR_DISABLE_NIX_FLAKE_INSTALLER") == "1":
print(
"[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 "
"NixFlakeInstaller is disabled."
)
if not ctx.quiet:
print("[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 skipping NixFlakeInstaller.")
return False
# Nix must be available.
if shutil.which("nix") is None:
return False
# flake.nix must exist in the repository.
flake_path = os.path.join(ctx.repo_dir, self.FLAKE_FILE)
return os.path.exists(flake_path)
def _ensure_old_profile_removed(self, ctx: "RepoContext") -> None:
"""
Best-effort removal of an existing profile entry.
This handles the "already provides the following file" conflict by
removing previous `package-manager` installations before we install
the new one.
Any error in `nix profile remove` is intentionally ignored, because
a missing profile entry is not a fatal condition.
"""
if shutil.which("nix") is None:
return
cmd = f"nix profile remove {self.PROFILE_NAME} || true"
try:
# NOTE: no allow_failure here → matches the existing unit tests
run_command(cmd, cwd=ctx.repo_dir, preview=ctx.preview)
except SystemExit:
# Unit tests explicitly assert this is swallowed
pass
return os.path.exists(os.path.join(ctx.repo_dir, self.FLAKE_FILE))
def _profile_outputs(self, ctx: "RepoContext") -> List[Tuple[str, bool]]:
"""
Decide which flake outputs to install and whether failures are fatal.
Returns a list of (output_name, allow_failure) tuples.
Rules:
- For the package-manager repo (identifier 'pkgmgr' or 'package-manager'):
[("pkgmgr", False), ("default", True)]
- For all other repos:
[("default", False)]
"""
ident = ctx.identifier
if ident in {"pkgmgr", "package-manager"}:
# pkgmgr: main CLI output is "pkgmgr" (mandatory),
# "default" is nice-to-have (non-fatal).
# (output_name, allow_failure)
if ctx.identifier in {"pkgmgr", "package-manager"}:
return [("pkgmgr", False), ("default", True)]
# Generic repos: we expect a sensible "default" package/app.
# Failure to install it is considered fatal.
return [("default", False)]
def run(self, ctx: "InstallContext") -> None:
"""
Install Nix flake profile outputs.
def _installable(self, ctx: "RepoContext", output: str) -> str:
return f"{ctx.repo_dir}#{output}"
For the package-manager repo, failure installing 'pkgmgr' is fatal,
failure installing 'default' is non-fatal.
For other repos, failure installing 'default' is fatal.
"""
# Reuse supports() to keep logic in one place.
if not self.supports(ctx): # type: ignore[arg-type]
return
outputs = self._profile_outputs(ctx) # list of (name, allow_failure)
print(
"Nix flake detected in "
f"{ctx.identifier}, attempting to install profile outputs: "
+ ", ".join(name for name, _ in outputs)
def _run(self, ctx: "RepoContext", cmd: str, allow_failure: bool = True):
return run_command(
cmd,
cwd=ctx.repo_dir,
preview=ctx.preview,
allow_failure=allow_failure,
)
# Handle the "already installed" case up-front for the shared profile.
self._ensure_old_profile_removed(ctx) # type: ignore[arg-type]
def _profile_list_json(self, ctx: "RepoContext") -> dict:
"""
Read current Nix profile entries as JSON (best-effort).
for output, allow_failure in outputs:
cmd = f"nix profile install {ctx.repo_dir}#{output}"
print(f"[INFO] Running: {cmd}")
ret = os.system(cmd)
NOTE: Nix versions differ:
- Newer: {"elements": [ { "index": 0, "attrPath": "...", ... }, ... ]}
- Older: {"elements": [ "nixpkgs#hello", ... ]} (strings)
# Extract real exit code from os.system() result
if os.WIFEXITED(ret):
exit_code = os.WEXITSTATUS(ret)
else:
# abnormal termination (signal etc.) keep raw value
exit_code = ret
We return {} on failure or in preview mode.
"""
if ctx.preview:
return {}
if exit_code == 0:
print(f"Nix flake output '{output}' successfully installed.")
proc = subprocess.run(
["nix", "profile", "list", "--json"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=os.environ.copy(),
)
if proc.returncode != 0:
return {}
try:
return json.loads(proc.stdout or "{}")
except json.JSONDecodeError:
return {}
def _find_installed_indices_for_output(self, ctx: "RepoContext", output: str) -> List[int]:
"""
Find installed profile indices for a given output.
Works across Nix JSON variants:
- If elements are dicts: we can extract indices.
- If elements are strings: we cannot extract indices -> return [].
"""
data = self._profile_list_json(ctx)
elements = data.get("elements", []) or []
matches: List[int] = []
for el in elements:
# Legacy JSON format: plain strings -> no index information
if not isinstance(el, dict):
continue
print(f"[Error] Failed to install Nix flake output '{output}'")
print(f"[Error] Command exited with code {exit_code}")
idx = el.get("index")
if idx is None:
continue
if not allow_failure:
raise SystemExit(exit_code)
attr_path = el.get("attrPath") or el.get("attr_path") or ""
pname = el.get("pname") or ""
name = el.get("name") or ""
if attr_path == output:
matches.append(int(idx))
continue
if pname == output or name == output:
matches.append(int(idx))
continue
if isinstance(attr_path, str) and attr_path.endswith(f".{output}"):
matches.append(int(idx))
continue
return matches
def _upgrade_index(self, ctx: "RepoContext", index: int) -> bool:
cmd = f"nix profile upgrade --refresh {index}"
if not ctx.quiet:
print(f"[nix] upgrade: {cmd}")
res = self._run(ctx, cmd, allow_failure=True)
return res.returncode == 0
def _remove_index(self, ctx: "RepoContext", index: int) -> None:
cmd = f"nix profile remove {index}"
if not ctx.quiet:
print(f"[nix] remove: {cmd}")
self._run(ctx, cmd, allow_failure=True)
def _install_only(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
"""
Install output; on failure, try index-based upgrade/remove+install if possible.
"""
installable = self._installable(ctx, output)
install_cmd = f"nix profile install {installable}"
if not ctx.quiet:
print(f"[nix] install: {install_cmd}")
res = self._run(ctx, install_cmd, allow_failure=True)
if res.returncode == 0:
if not ctx.quiet:
print(f"[nix] output '{output}' successfully installed.")
return
if not ctx.quiet:
print(
"[Warning] Continuing despite failure to install "
f"optional output '{output}'."
f"[nix] install failed for '{output}' (exit {res.returncode}), "
"trying index-based upgrade/remove+install..."
)
indices = self._find_installed_indices_for_output(ctx, output)
# 1) Try upgrading existing indices (only possible on newer JSON format)
upgraded = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
if upgraded:
return
# 2) Remove matching indices and retry install
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
for idx in indices:
self._remove_index(ctx, idx)
final = self._run(ctx, install_cmd, allow_failure=True)
if final.returncode == 0:
if not ctx.quiet:
print(f"[nix] output '{output}' successfully re-installed.")
return
msg = f"[ERROR] Failed to install Nix flake output '{output}' (exit {final.returncode})"
print(msg)
if not allow_failure:
raise SystemExit(final.returncode)
print(f"[WARNING] Continuing despite failure of optional output '{output}'.")
def _force_upgrade_output(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
"""
force_update path:
- Prefer upgrading existing entries via indices (if we can discover them).
- If no indices (legacy JSON) or upgrade fails, fall back to install-only logic.
"""
indices = self._find_installed_indices_for_output(ctx, output)
upgraded_any = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded_any = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
if upgraded_any:
# Make upgrades visible to tests
print(f"[nix] output '{output}' successfully upgraded.")
return
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
for idx in indices:
self._remove_index(ctx, idx)
# Ensure installed (includes its own fallback logic)
self._install_only(ctx, output, allow_failure)
# Make upgrades visible to tests (semantic: update requested)
print(f"[nix] output '{output}' successfully upgraded.")
def run(self, ctx: "RepoContext") -> None:
if not self.supports(ctx):
return
outputs = self._profile_outputs(ctx)
if not ctx.quiet:
print(
"[nix] flake detected in "
f"{ctx.identifier}, ensuring outputs: "
+ ", ".join(name for name, _ in outputs)
)
for output, allow_failure in outputs:
if ctx.force_update:
self._force_upgrade_output(ctx, output, allow_failure)
else:
self._install_only(ctx, output, allow_failure)

View File

@@ -1,104 +1,40 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PythonInstaller — install Python projects defined via pyproject.toml.
Installation rules:
1. pip command resolution:
a) If PKGMGR_PIP is set → use it exactly as provided.
b) Else if running inside a virtualenv → use `sys.executable -m pip`.
c) Else → create/use a per-repository virtualenv under ~/.venvs/<repo>/.
2. Installation target:
- Always install into the resolved pip environment.
- Never modify system Python, never rely on --user.
- Nix-immutable systems (PEP 668) are automatically avoided because we
never touch system Python.
3. The installer is skipped when:
- PKGMGR_DISABLE_PYTHON_INSTALLER=1 is set.
- The repository has no pyproject.toml.
All pip failures are treated as fatal.
"""
# src/pkgmgr/actions/install/installers/python.py
from __future__ import annotations
import os
import sys
import subprocess
from typing import TYPE_CHECKING
from pkgmgr.actions.install.installers.base import BaseInstaller
from pkgmgr.actions.install.context import RepoContext
from pkgmgr.core.command.run import run_command
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
from pkgmgr.actions.install import InstallContext
class PythonInstaller(BaseInstaller):
"""Install Python projects and dependencies via pip using isolated environments."""
layer = "python"
# ----------------------------------------------------------------------
# Installer activation logic
# ----------------------------------------------------------------------
def supports(self, ctx: "RepoContext") -> bool:
"""
Return True if this installer should handle this repository.
The installer is active only when:
- A pyproject.toml exists in the repo, and
- PKGMGR_DISABLE_PYTHON_INSTALLER is not set.
"""
def supports(self, ctx: RepoContext) -> bool:
if os.environ.get("PKGMGR_DISABLE_PYTHON_INSTALLER") == "1":
print("[INFO] PythonInstaller disabled via PKGMGR_DISABLE_PYTHON_INSTALLER.")
return False
return os.path.exists(os.path.join(ctx.repo_dir, "pyproject.toml"))
# ----------------------------------------------------------------------
# Virtualenv handling
# ----------------------------------------------------------------------
def _in_virtualenv(self) -> bool:
"""Detect whether the current interpreter is inside a venv."""
if os.environ.get("VIRTUAL_ENV"):
return True
base = getattr(sys, "base_prefix", sys.prefix)
return sys.prefix != base
def _ensure_repo_venv(self, ctx: "InstallContext") -> str:
"""
Ensure that ~/.venvs/<identifier>/ exists and contains a minimal venv.
Returns the venv directory path.
"""
def _ensure_repo_venv(self, ctx: RepoContext) -> str:
venv_dir = os.path.expanduser(f"~/.venvs/{ctx.identifier}")
python = sys.executable
if not os.path.isdir(venv_dir):
print(f"[python-installer] Creating virtualenv: {venv_dir}")
subprocess.check_call([python, "-m", "venv", venv_dir])
if not os.path.exists(venv_dir):
run_command(f"{python} -m venv {venv_dir}", preview=ctx.preview)
return venv_dir
# ----------------------------------------------------------------------
# pip command resolution
# ----------------------------------------------------------------------
def _pip_cmd(self, ctx: "InstallContext") -> str:
"""
Determine which pip command to use.
Priority:
1. PKGMGR_PIP override given by user or automation.
2. Active virtualenv → use sys.executable -m pip.
3. Per-repository venv → ~/.venvs/<repo>/bin/pip
"""
def _pip_cmd(self, ctx: RepoContext) -> str:
explicit = os.environ.get("PKGMGR_PIP", "").strip()
if explicit:
return explicit
@@ -107,33 +43,19 @@ class PythonInstaller(BaseInstaller):
return f"{sys.executable} -m pip"
venv_dir = self._ensure_repo_venv(ctx)
pip_path = os.path.join(venv_dir, "bin", "pip")
return pip_path
return os.path.join(venv_dir, "bin", "pip")
# ----------------------------------------------------------------------
# Execution
# ----------------------------------------------------------------------
def run(self, ctx: "InstallContext") -> None:
"""
Install the project defined by pyproject.toml.
Uses the resolved pip environment. Installation is isolated and never
touches system Python.
"""
if not self.supports(ctx): # type: ignore[arg-type]
return
pyproject = os.path.join(ctx.repo_dir, "pyproject.toml")
if not os.path.exists(pyproject):
def run(self, ctx: RepoContext) -> None:
if not self.supports(ctx):
return
print(f"[python-installer] Installing Python project for {ctx.identifier}...")
pip_cmd = self._pip_cmd(ctx)
run_command(f"{pip_cmd} install .", cwd=ctx.repo_dir, preview=ctx.preview)
# Final install command: ALWAYS isolated, never system-wide.
install_cmd = f"{pip_cmd} install ."
run_command(install_cmd, cwd=ctx.repo_dir, preview=ctx.preview)
if ctx.force_update:
# test-visible marker
print(f"[python-installer] repo '{ctx.identifier}' successfully upgraded.")
print(f"[python-installer] Installation finished for {ctx.identifier}.")

View File

@@ -1,21 +1,9 @@
# src/pkgmgr/actions/install/pipeline.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Installation pipeline orchestration for repositories.
This module implements the "Setup Controller" logic:
1. Detect current CLI command for the repo (if any).
2. Classify it into a layer (os-packages, nix, python, makefile).
3. Iterate over installers in layer order:
- Skip installers whose layer is weaker than an already-loaded one.
- Run only installers that support() the repo and add new capabilities.
- After each installer, re-resolve the command and update the layer.
4. Maintain the repo["command"] field and create/update symlinks via create_ink().
The goal is to prevent conflicting installations and make the layering
behaviour explicit and testable.
"""
from __future__ import annotations
@@ -36,34 +24,15 @@ from pkgmgr.core.command.resolve import resolve_command_for_repo
@dataclass
class CommandState:
"""
Represents the current CLI state for a repository:
- command: absolute or relative path to the CLI entry point
- layer: which conceptual layer this command belongs to
"""
command: Optional[str]
layer: Optional[CliLayer]
class CommandResolver:
"""
Small helper responsible for resolving the current command for a repo
and mapping it into a CommandState.
"""
def __init__(self, ctx: RepoContext) -> None:
self._ctx = ctx
def resolve(self) -> CommandState:
"""
Resolve the current command for this repository.
If resolve_command_for_repo raises SystemExit (e.g. Python package
without installed entry point), we treat this as "no command yet"
from the point of view of the installers.
"""
repo = self._ctx.repo
identifier = self._ctx.identifier
repo_dir = self._ctx.repo_dir
@@ -85,28 +54,10 @@ class CommandResolver:
class InstallationPipeline:
"""
High-level orchestrator that applies a sequence of installers
to a repository based on CLI layer precedence.
"""
def __init__(self, installers: Sequence[BaseInstaller]) -> None:
self._installers = list(installers)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def run(self, ctx: RepoContext) -> None:
"""
Execute the installation pipeline for a single repository.
- Detect initial command & layer.
- Optionally create a symlink.
- Run installers in order, skipping those whose layer is weaker
than an already-loaded CLI.
- After each installer, re-resolve the command and refresh the
symlink if needed.
"""
repo = ctx.repo
repo_dir = ctx.repo_dir
identifier = ctx.identifier
@@ -119,7 +70,6 @@ class InstallationPipeline:
resolver = CommandResolver(ctx)
state = resolver.resolve()
# Persist initial command (if any) and create a symlink.
if state.command:
repo["command"] = state.command
create_ink(
@@ -135,11 +85,9 @@ class InstallationPipeline:
provided_capabilities: Set[str] = set()
# Main installer loop
for installer in self._installers:
layer_name = getattr(installer, "layer", None)
# Installers without a layer participate without precedence logic.
if layer_name is None:
self._run_installer(installer, ctx, identifier, repo_dir, quiet)
continue
@@ -147,17 +95,13 @@ class InstallationPipeline:
try:
installer_layer = CliLayer(layer_name)
except ValueError:
# Unknown layer string → treat as lowest priority.
installer_layer = None
# "Previous/Current layer already loaded?"
if state.layer is not None and installer_layer is not None:
current_prio = layer_priority(state.layer)
installer_prio = layer_priority(installer_layer)
if current_prio < installer_prio:
# Current CLI comes from a higher-priority layer,
# so we skip this installer entirely.
if not quiet:
print(
"[pkgmgr] Skipping installer "
@@ -166,9 +110,7 @@ class InstallationPipeline:
)
continue
if current_prio == installer_prio:
# Same layer already provides a CLI; usually there is no
# need to run another installer on top of it.
if current_prio == installer_prio and not ctx.force_update:
if not quiet:
print(
"[pkgmgr] Skipping installer "
@@ -177,12 +119,9 @@ class InstallationPipeline:
)
continue
# Check if this installer is applicable at all.
if not installer.supports(ctx):
continue
# Capabilities: if everything this installer would provide is already
# covered, we can safely skip it.
caps = installer.discover_capabilities(ctx)
if caps and caps.issubset(provided_capabilities):
if not quiet:
@@ -193,18 +132,22 @@ class InstallationPipeline:
continue
if not quiet:
print(
f"[pkgmgr] Running installer {installer.__class__.__name__} "
f"for {identifier} in '{repo_dir}' "
f"(new capabilities: {caps or set()})..."
)
if ctx.force_update and state.layer is not None and installer_layer == state.layer:
print(
f"[pkgmgr] Running installer {installer.__class__.__name__} "
f"for {identifier} in '{repo_dir}' (upgrade requested)..."
)
else:
print(
f"[pkgmgr] Running installer {installer.__class__.__name__} "
f"for {identifier} in '{repo_dir}' "
f"(new capabilities: {caps or set()})..."
)
# Run the installer with error reporting.
self._run_installer(installer, ctx, identifier, repo_dir, quiet)
provided_capabilities.update(caps)
# After running an installer, re-resolve the command and layer.
new_state = resolver.resolve()
if new_state.command:
repo["command"] = new_state.command
@@ -221,9 +164,6 @@ class InstallationPipeline:
state = new_state
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _run_installer(
installer: BaseInstaller,
@@ -232,9 +172,6 @@ class InstallationPipeline:
repo_dir: str,
quiet: bool,
) -> None:
"""
Execute a single installer with unified error handling.
"""
try:
installer.run(ctx)
except SystemExit as exc:

View File

@@ -1,14 +1,121 @@
# src/pkgmgr/actions/mirror/setup_cmd.py
from __future__ import annotations
from typing import List, Tuple
from urllib.parse import urlparse
from pkgmgr.core.git import run_git, GitError
from pkgmgr.core.git import GitError, run_git
from pkgmgr.core.remote_provisioning import ProviderHint, RepoSpec, ensure_remote_repo
from pkgmgr.core.remote_provisioning.ensure import EnsureOptions
from .context import build_context
from .git_remote import determine_primary_remote_url, ensure_origin_remote
from .types import Repository
def _probe_mirror(url: str, repo_dir: str) -> Tuple[bool, str]:
"""
Probe a remote mirror URL using `git ls-remote`.
Returns:
(True, "") on success,
(False, error_message) on failure.
"""
try:
run_git(["ls-remote", url], cwd=repo_dir)
return True, ""
except GitError as exc:
return False, str(exc)
def _host_from_git_url(url: str) -> str:
url = (url or "").strip()
if not url:
return ""
if "://" in url:
parsed = urlparse(url)
netloc = (parsed.netloc or "").strip()
if "@" in netloc:
netloc = netloc.split("@", 1)[1]
# keep optional :port
return netloc
# scp-like: git@host:owner/repo.git
if "@" in url and ":" in url:
after_at = url.split("@", 1)[1]
host = after_at.split(":", 1)[0]
return host.strip()
return url.split("/", 1)[0].strip()
def _ensure_remote_repository(
repo: Repository,
repositories_base_dir: str,
all_repos: List[Repository],
preview: bool,
) -> None:
"""
Ensure that the remote repository exists using provider APIs.
This is ONLY called when ensure_remote=True.
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
resolved_mirrors = ctx.resolved_mirrors
primary_url = determine_primary_remote_url(repo, resolved_mirrors)
if not primary_url:
print("[INFO] No remote URL could be derived; skipping remote provisioning.")
return
# IMPORTANT:
# - repo["provider"] is typically a provider *kind* (e.g. "github" / "gitea"),
# NOT a hostname. We derive the actual host from the remote URL.
host = _host_from_git_url(primary_url)
owner = repo.get("account")
name = repo.get("repository")
if not host or not owner or not name:
print("[WARN] Missing host/account/repository; cannot ensure remote repo.")
print(f" host={host!r}, account={owner!r}, repository={name!r}")
return
print("------------------------------------------------------------")
print(f"[REMOTE ENSURE] {ctx.identifier}")
print(f"[REMOTE ENSURE] host: {host}")
print("------------------------------------------------------------")
spec = RepoSpec(
host=str(host),
owner=str(owner),
name=str(name),
private=bool(repo.get("private", True)),
description=str(repo.get("description", "")),
)
provider_kind = str(repo.get("provider", "")).strip().lower() or None
try:
result = ensure_remote_repo(
spec,
provider_hint=ProviderHint(kind=provider_kind),
options=EnsureOptions(
preview=preview,
interactive=True,
allow_prompt=True,
save_prompt_token_to_keyring=True,
),
)
print(f"[REMOTE ENSURE] {result.status.upper()}: {result.message}")
if result.url:
print(f"[REMOTE ENSURE] URL: {result.url}")
except Exception as exc: # noqa: BLE001
# Keep action layer resilient
print(f"[ERROR] Remote provisioning failed: {exc}")
print()
def _setup_local_mirrors_for_repo(
repo: Repository,
repositories_base_dir: str,
@@ -16,7 +123,8 @@ def _setup_local_mirrors_for_repo(
preview: bool,
) -> None:
"""
Ensure local Git state is sane (currently: 'origin' remote).
Local setup:
- Ensure 'origin' remote exists and is sane
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
@@ -29,103 +137,68 @@ def _setup_local_mirrors_for_repo(
print()
def _probe_mirror(url: str, repo_dir: str) -> Tuple[bool, str]:
"""
Probe a remote mirror by running `git ls-remote <url>`.
Returns:
(True, "") on success,
(False, error_message) on failure.
Wichtig:
- Wir werten ausschließlich den Exit-Code aus.
- STDERR kann Hinweise/Warnings enthalten und ist NICHT automatisch ein Fehler.
"""
try:
# Wir ignorieren stdout komplett; wichtig ist nur, dass der Befehl ohne
# GitError (also Exit-Code 0) durchläuft.
run_git(["ls-remote", url], cwd=repo_dir)
return True, ""
except GitError as exc:
return False, str(exc)
def _setup_remote_mirrors_for_repo(
repo: Repository,
repositories_base_dir: str,
all_repos: List[Repository],
preview: bool,
ensure_remote: bool,
) -> None:
"""
Remote-side setup / validation.
Aktuell werden nur **nicht-destruktive Checks** gemacht:
Default behavior:
- Non-destructive checks using `git ls-remote`.
- Für jeden Mirror (aus config + MIRRORS-Datei, file gewinnt):
* `git ls-remote <url>` wird ausgeführt.
* Bei Exit-Code 0 → [OK]
* Bei Fehler → [WARN] + Details aus der GitError-Exception
Es werden **keine** Provider-APIs aufgerufen und keine Repos angelegt.
Optional behavior:
- If ensure_remote=True:
* Attempt to create missing repositories via provider API
* Uses TokenResolver (ENV -> keyring -> prompt)
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
resolved_m = ctx.resolved_mirrors
resolved_mirrors = ctx.resolved_mirrors
print("------------------------------------------------------------")
print(f"[MIRROR SETUP:REMOTE] {ctx.identifier}")
print(f"[MIRROR SETUP:REMOTE] dir: {ctx.repo_dir}")
print("------------------------------------------------------------")
if not resolved_m:
# Optional: Fallback auf eine heuristisch bestimmte URL, falls wir
# irgendwann "automatisch anlegen" implementieren wollen.
primary_url = determine_primary_remote_url(repo, resolved_m)
if ensure_remote:
_ensure_remote_repository(
repo,
repositories_base_dir=repositories_base_dir,
all_repos=all_repos,
preview=preview,
)
if not resolved_mirrors:
primary_url = determine_primary_remote_url(repo, resolved_mirrors)
if not primary_url:
print(
"[INFO] No mirrors configured (config or MIRRORS file), and no "
"primary URL could be derived from provider/account/repository."
)
print("[INFO] No mirrors configured and no primary URL available.")
print()
return
ok, error_message = _probe_mirror(primary_url, ctx.repo_dir)
if ok:
print(f"[OK] Remote mirror (primary) is reachable: {primary_url}")
print(f"[OK] primary: {primary_url}")
else:
print("[WARN] Primary remote URL is NOT reachable:")
print(f" {primary_url}")
if error_message:
print(" Details:")
for line in error_message.splitlines():
print(f" {line}")
print(f"[WARN] primary: {primary_url}")
for line in error_message.splitlines():
print(f" {line}")
print()
print(
"[INFO] Remote checks are non-destructive and only use `git ls-remote` "
"to probe mirror URLs."
)
print()
return
# Normaler Fall: wir haben benannte Mirrors aus config/MIRRORS
for name, url in sorted(resolved_m.items()):
for name, url in sorted(resolved_mirrors.items()):
ok, error_message = _probe_mirror(url, ctx.repo_dir)
if ok:
print(f"[OK] Remote mirror '{name}' is reachable: {url}")
print(f"[OK] {name}: {url}")
else:
print(f"[WARN] Remote mirror '{name}' is NOT reachable:")
print(f" {url}")
if error_message:
print(" Details:")
for line in error_message.splitlines():
print(f" {line}")
print(f"[WARN] {name}: {url}")
for line in error_message.splitlines():
print(f" {line}")
print()
print(
"[INFO] Remote checks are non-destructive and only use `git ls-remote` "
"to probe mirror URLs."
)
print()
def setup_mirrors(
@@ -135,22 +208,25 @@ def setup_mirrors(
preview: bool = False,
local: bool = True,
remote: bool = True,
ensure_remote: bool = False,
) -> None:
"""
Setup mirrors for the selected repositories.
local:
- Configure local Git remotes (currently: ensure 'origin' is present and
points to a reasonable URL).
- Configure local Git remotes (ensure 'origin' exists).
remote:
- Non-destructive remote checks using `git ls-remote` for each mirror URL.
Es werden keine Repositories auf dem Provider angelegt.
- Non-destructive remote checks using `git ls-remote`.
ensure_remote:
- If True, attempt to create missing remote repositories via provider APIs.
- This is explicit and NEVER enabled implicitly.
"""
for repo in selected_repos:
if local:
_setup_local_mirrors_for_repo(
repo,
repo=repo,
repositories_base_dir=repositories_base_dir,
all_repos=all_repos,
preview=preview,
@@ -158,8 +234,9 @@ def setup_mirrors(
if remote:
_setup_remote_mirrors_for_repo(
repo,
repo=repo,
repositories_base_dir=repositories_base_dir,
all_repos=all_repos,
preview=preview,
ensure_remote=ensure_remote,
)

View File

@@ -1,9 +1,12 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import subprocess
import sys
from pkgmgr.core.repository.identifier import get_repo_identifier
from pkgmgr.core.repository.dir import get_repo_dir
from pkgmgr.core.repository.identifier import get_repo_identifier
from pkgmgr.core.repository.verify import verify_repository
@@ -17,13 +20,6 @@ def pull_with_verification(
) -> None:
"""
Execute `git pull` for each repository with verification.
- Uses verify_repository() in "pull" mode.
- If verification fails (and verification info is set) and
--no-verification is not enabled, the user is prompted to confirm
the pull.
- In preview mode, no interactive prompts are performed and no
Git commands are executed; only the would-be command is printed.
"""
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
@@ -34,18 +30,13 @@ def pull_with_verification(
continue
verified_info = repo.get("verified")
verified_ok, errors, commit_hash, signing_key = verify_repository(
verified_ok, errors, _commit_hash, _signing_key = verify_repository(
repo,
repo_dir,
mode="pull",
no_verification=no_verification,
)
# Only prompt the user if:
# - we are NOT in preview mode
# - verification is enabled
# - the repo has verification info configured
# - verification failed
if (
not preview
and not no_verification
@@ -59,16 +50,14 @@ def pull_with_verification(
if choice != "y":
continue
# Build the git pull command (include extra args if present)
args_part = " ".join(extra_args) if extra_args else ""
full_cmd = f"git pull{(' ' + args_part) if args_part else ''}"
if preview:
# Preview mode: only show the command, do not execute or prompt.
print(f"[Preview] In '{repo_dir}': {full_cmd}")
else:
print(f"Running in '{repo_dir}': {full_cmd}")
result = subprocess.run(full_cmd, cwd=repo_dir, shell=True)
result = subprocess.run(full_cmd, cwd=repo_dir, shell=True, check=False)
if result.returncode != 0:
print(
f"'git pull' for {repo_identifier} failed "

View File

@@ -1,67 +0,0 @@
import shutil
from pkgmgr.actions.repository.pull import pull_with_verification
from pkgmgr.actions.install import install_repos
def update_repos(
selected_repos,
repositories_base_dir,
bin_dir,
all_repos,
no_verification,
system_update,
preview: bool,
quiet: bool,
update_dependencies: bool,
clone_mode: str,
):
"""
Update repositories by pulling latest changes and installing them.
Parameters:
- selected_repos: List of selected repositories.
- repositories_base_dir: Base directory for repositories.
- bin_dir: Directory for symbolic links.
- all_repos: All repository configurations.
- no_verification: Whether to skip verification.
- system_update: Whether to run system update.
- preview: If True, only show commands without executing.
- quiet: If True, suppress messages.
- update_dependencies: Whether to update dependent repositories.
- clone_mode: Method to clone repositories (ssh or https).
"""
pull_with_verification(
selected_repos,
repositories_base_dir,
all_repos,
[],
no_verification,
preview,
)
install_repos(
selected_repos,
repositories_base_dir,
bin_dir,
all_repos,
no_verification,
preview,
quiet,
clone_mode,
update_dependencies,
)
if system_update:
from pkgmgr.core.command.run import run_command
# Nix: upgrade all profile entries (if Nix is available)
if shutil.which("nix") is not None:
try:
run_command("nix profile upgrade '.*'", preview=preview)
except SystemExit as e:
print(f"[Warning] 'nix profile upgrade' failed: {e}")
# Arch / AUR system update
run_command("sudo -u aur_builder yay -Syu --noconfirm", preview=preview)
run_command("sudo pacman -Syyu --noconfirm", preview=preview)

View File

@@ -0,0 +1,10 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from pkgmgr.actions.update.manager import UpdateManager
__all__ = [
"UpdateManager",
]

View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Any, Iterable
from pkgmgr.actions.update.system_updater import SystemUpdater
class UpdateManager:
"""
Orchestrates:
- repository pull + installation
- optional system update
"""
def __init__(self) -> None:
self._system_updater = SystemUpdater()
def run(
self,
selected_repos: Iterable[Any],
repositories_base_dir: str,
bin_dir: str,
all_repos: Any,
no_verification: bool,
system_update: bool,
preview: bool,
quiet: bool,
update_dependencies: bool,
clone_mode: str,
force_update: bool = True,
) -> None:
from pkgmgr.actions.install import install_repos
from pkgmgr.actions.repository.pull import pull_with_verification
pull_with_verification(
selected_repos,
repositories_base_dir,
all_repos,
[],
no_verification,
preview,
)
install_repos(
selected_repos,
repositories_base_dir,
bin_dir,
all_repos,
no_verification,
preview,
quiet,
clone_mode,
update_dependencies,
force_update=force_update,
)
if system_update:
self._system_updater.run(preview=preview)

View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Dict
def read_os_release(path: str = "/etc/os-release") -> Dict[str, str]:
"""
Parse /etc/os-release into a dict. Returns empty dict if missing.
"""
if not os.path.exists(path):
return {}
result: Dict[str, str] = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
result[key.strip()] = value.strip().strip('"')
return result
@dataclass(frozen=True)
class OSReleaseInfo:
"""
Minimal /etc/os-release representation for distro detection.
"""
id: str = ""
id_like: str = ""
pretty_name: str = ""
@staticmethod
def load() -> "OSReleaseInfo":
data = read_os_release()
return OSReleaseInfo(
id=(data.get("ID") or "").lower(),
id_like=(data.get("ID_LIKE") or "").lower(),
pretty_name=(data.get("PRETTY_NAME") or ""),
)
def ids(self) -> set[str]:
ids: set[str] = set()
if self.id:
ids.add(self.id)
if self.id_like:
for part in self.id_like.split():
ids.add(part.strip())
return ids
def is_arch_family(self) -> bool:
ids = self.ids()
return ("arch" in ids) or ("archlinux" in ids)
def is_debian_family(self) -> bool:
ids = self.ids()
return bool(ids.intersection({"debian", "ubuntu"}))
def is_fedora_family(self) -> bool:
ids = self.ids()
return bool(ids.intersection({"fedora", "rhel", "centos", "rocky", "almalinux"}))

View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import platform
import shutil
from pkgmgr.actions.update.os_release import OSReleaseInfo
class SystemUpdater:
"""
Executes distro-specific system update commands, plus Nix profile upgrades if available.
"""
def run(self, *, preview: bool) -> None:
from pkgmgr.core.command.run import run_command
# Distro-agnostic: Nix profile upgrades (if Nix is present).
if shutil.which("nix") is not None:
try:
run_command("nix profile upgrade '.*'", preview=preview)
except SystemExit as e:
print(f"[Warning] 'nix profile upgrade' failed: {e}")
osr = OSReleaseInfo.load()
if osr.is_arch_family():
self._update_arch(preview=preview)
return
if osr.is_debian_family():
self._update_debian(preview=preview)
return
if osr.is_fedora_family():
self._update_fedora(preview=preview)
return
distro = osr.pretty_name or platform.platform()
print(f"[Warning] Unsupported distribution for system update: {distro}")
def _update_arch(self, *, preview: bool) -> None:
from pkgmgr.core.command.run import run_command
yay = shutil.which("yay")
pacman = shutil.which("pacman")
sudo = shutil.which("sudo")
# Prefer yay if available (repo + AUR in one pass).
# Avoid running yay and pacman afterwards to prevent double update passes.
if yay and sudo:
run_command("sudo -u aur_builder yay -Syu --noconfirm", preview=preview)
return
if pacman and sudo:
run_command("sudo pacman -Syu --noconfirm", preview=preview)
return
print("[Warning] Cannot update Arch system: missing required tools (sudo/yay/pacman).")
def _update_debian(self, *, preview: bool) -> None:
from pkgmgr.core.command.run import run_command
sudo = shutil.which("sudo")
apt_get = shutil.which("apt-get")
if not (sudo and apt_get):
print("[Warning] Cannot update Debian/Ubuntu system: missing required tools (sudo/apt-get).")
return
env = "DEBIAN_FRONTEND=noninteractive"
run_command(f"sudo {env} apt-get update -y", preview=preview)
run_command(f"sudo {env} apt-get -y dist-upgrade", preview=preview)
def _update_fedora(self, *, preview: bool) -> None:
from pkgmgr.core.command.run import run_command
sudo = shutil.which("sudo")
dnf = shutil.which("dnf")
microdnf = shutil.which("microdnf")
if not sudo:
print("[Warning] Cannot update Fedora/RHEL-like system: missing sudo.")
return
if dnf:
run_command("sudo dnf -y upgrade", preview=preview)
return
if microdnf:
run_command("sudo microdnf -y upgrade", preview=preview)
return
print("[Warning] Cannot update Fedora/RHEL-like system: missing dnf/microdnf.")

View File

@@ -1,32 +1,30 @@
# src/pkgmgr/cli/commands/mirror.py
from __future__ import annotations
import sys
from typing import Any, Dict, List
from pkgmgr.actions.mirror import (
diff_mirrors,
list_mirrors,
merge_mirrors,
setup_mirrors,
)
from pkgmgr.actions.mirror import diff_mirrors, list_mirrors, merge_mirrors, setup_mirrors
from pkgmgr.cli.context import CLIContext
Repository = Dict[str, Any]
def handle_mirror_command(
args,
ctx: CLIContext,
args: Any,
selected: List[Repository],
) -> None:
"""
Entry point for 'pkgmgr mirror' subcommands.
Subcommands:
- mirror list → list configured mirrors
- mirror diff → compare config vs MIRRORS file
- mirror merge → merge mirrors between config and MIRRORS file
- mirror setup → configure local Git + remote placeholders
- mirror list
- mirror diff
- mirror merge
- mirror setup
- mirror check
- mirror provision
"""
if not selected:
print("[INFO] No repositories selected for 'mirror' command.")
@@ -34,9 +32,6 @@ def handle_mirror_command(
subcommand = getattr(args, "subcommand", None)
# ------------------------------------------------------------
# mirror list
# ------------------------------------------------------------
if subcommand == "list":
source = getattr(args, "source", "all")
list_mirrors(
@@ -47,9 +42,6 @@ def handle_mirror_command(
)
return
# ------------------------------------------------------------
# mirror diff
# ------------------------------------------------------------
if subcommand == "diff":
diff_mirrors(
selected_repos=selected,
@@ -58,27 +50,17 @@ def handle_mirror_command(
)
return
# ------------------------------------------------------------
# mirror merge
# ------------------------------------------------------------
if subcommand == "merge":
source = getattr(args, "source", None)
target = getattr(args, "target", None)
preview = getattr(args, "preview", False)
if source == target:
print(
"[ERROR] For 'mirror merge', source and target "
"must differ (one of: config, file)."
)
print("[ERROR] For 'mirror merge', source and target must differ (config vs file).")
sys.exit(2)
# Config file path can be passed explicitly via --config-path.
# If not given, fall back to the global context (if available).
explicit_config_path = getattr(args, "config_path", None)
user_config_path = explicit_config_path or getattr(
ctx, "user_config_path", None
)
user_config_path = explicit_config_path or getattr(ctx, "user_config_path", None)
merge_mirrors(
selected_repos=selected,
@@ -91,26 +73,42 @@ def handle_mirror_command(
)
return
# ------------------------------------------------------------
# mirror setup
# ------------------------------------------------------------
if subcommand == "setup":
local = getattr(args, "local", False)
remote = getattr(args, "remote", False)
preview = getattr(args, "preview", False)
# If neither flag is set → default to both.
if not local and not remote:
local = True
remote = True
setup_mirrors(
selected_repos=selected,
repositories_base_dir=ctx.repositories_base_dir,
all_repos=ctx.all_repositories,
preview=preview,
local=local,
remote=remote,
local=True,
remote=False,
ensure_remote=False,
)
return
if subcommand == "check":
preview = getattr(args, "preview", False)
setup_mirrors(
selected_repos=selected,
repositories_base_dir=ctx.repositories_base_dir,
all_repos=ctx.all_repositories,
preview=preview,
local=False,
remote=True,
ensure_remote=False,
)
return
if subcommand == "provision":
preview = getattr(args, "preview", False)
setup_mirrors(
selected_repos=selected,
repositories_base_dir=ctx.repositories_base_dir,
all_repos=ctx.all_repositories,
preview=preview,
local=False,
remote=True,
ensure_remote=True,
)
return

View File

@@ -10,11 +10,10 @@ from pkgmgr.cli.context import CLIContext
from pkgmgr.actions.install import install_repos
from pkgmgr.actions.repository.deinstall import deinstall_repos
from pkgmgr.actions.repository.delete import delete_repos
from pkgmgr.actions.repository.update import update_repos
from pkgmgr.actions.repository.status import status_repos
from pkgmgr.actions.repository.list import list_repositories
from pkgmgr.core.command.run import run_command
from pkgmgr.actions.repository.create import create_repo
from pkgmgr.core.command.run import run_command
from pkgmgr.core.repository.dir import get_repo_dir
Repository = Dict[str, Any]
@@ -51,7 +50,7 @@ def handle_repos_command(
selected: List[Repository],
) -> None:
"""
Handle core repository commands (install/update/deinstall/delete/.../list).
Handle core repository commands (install/update/deinstall/delete/status/list/path/shell/create).
"""
# ------------------------------------------------------------
@@ -68,24 +67,7 @@ def handle_repos_command(
args.quiet,
args.clone_mode,
args.dependencies,
)
return
# ------------------------------------------------------------
# update
# ------------------------------------------------------------
if args.command == "update":
update_repos(
selected,
ctx.repositories_base_dir,
ctx.binaries_dir,
ctx.all_repositories,
args.no_verification,
args.system,
args.preview,
args.quiet,
args.dependencies,
args.clone_mode,
force_update=getattr(args, "update", False),
)
return
@@ -146,9 +128,7 @@ def handle_repos_command(
f"{repository.get('account', '?')}/"
f"{repository.get('repository', '?')}"
)
print(
f"[WARN] Could not resolve directory for {ident}: {exc}"
)
print(f"[WARN] Could not resolve directory for {ident}: {exc}")
continue
print(repo_dir)

View File

@@ -129,7 +129,6 @@ def dispatch_command(args, ctx: CLIContext) -> None:
# ------------------------------------------------------------------ #
if args.command in (
"install",
"update",
"deinstall",
"delete",
"status",
@@ -141,6 +140,27 @@ def dispatch_command(args, ctx: CLIContext) -> None:
handle_repos_command(args, ctx, selected)
return
# ------------------------------------------------------------
# update
# ------------------------------------------------------------
if args.command == "update":
from pkgmgr.actions.update import UpdateManager
UpdateManager().run(
selected_repos=selected,
repositories_base_dir=ctx.repositories_base_dir,
bin_dir=ctx.binaries_dir,
all_repos=ctx.all_repositories,
no_verification=args.no_verification,
system_update=args.system,
preview=args.preview,
quiet=args.quiet,
update_dependencies=args.dependencies,
clone_mode=args.clone_mode,
force_update=True,
)
return
# ------------------------------------------------------------------ #
# Tools (explore / terminal / code)
# ------------------------------------------------------------------ #
@@ -176,7 +196,7 @@ def dispatch_command(args, ctx: CLIContext) -> None:
return
if args.command == "mirror":
handle_mirror_command(args, ctx, selected)
handle_mirror_command(ctx, args, selected)
return
print(f"Unknown command: {args.command}")

View File

@@ -1,96 +1,134 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# src/pkgmgr/cli/parser/common.py
from __future__ import annotations
import argparse
from typing import Optional, Tuple
class SortedSubParsersAction(argparse._SubParsersAction):
"""
Subparsers action that keeps choices sorted alphabetically.
Subparsers action that keeps subcommands sorted alphabetically.
"""
def add_parser(self, name, **kwargs):
parser = super().add_parser(name, **kwargs)
# Sort choices alphabetically by dest (subcommand name)
self._choices_actions.sort(key=lambda a: a.dest)
return parser
def _has_action(
parser: argparse.ArgumentParser,
*,
positional: Optional[str] = None,
options: Tuple[str, ...] = (),
) -> bool:
"""
Check whether the parser already has an action.
- positional: name of a positional argument (e.g. "identifiers")
- options: option strings (e.g. "--preview", "-q")
"""
for action in parser._actions:
if positional and action.dest == positional:
return True
if options and any(opt in action.option_strings for opt in options):
return True
return False
def _add_positional_if_missing(
parser: argparse.ArgumentParser,
name: str,
**kwargs,
) -> None:
"""Safely add a positional argument."""
if _has_action(parser, positional=name):
return
parser.add_argument(name, **kwargs)
def _add_option_if_missing(
parser: argparse.ArgumentParser,
*option_strings: str,
**kwargs,
) -> None:
"""Safely add an optional argument."""
if _has_action(parser, options=tuple(option_strings)):
return
parser.add_argument(*option_strings, **kwargs)
def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
"""
Common identifier / selection arguments for many subcommands.
Selection modes (mutual intent, not hard-enforced):
- identifiers (positional): select by alias / provider/account/repo
- --all: select all repositories
- --category / --string / --tag: filter-based selection on top
of the full repository set
"""
subparser.add_argument(
_add_positional_if_missing(
subparser,
"identifiers",
nargs="*",
help=(
"Identifier(s) for repositories. "
"Default: Repository of current folder."
"Default: repository of the current working directory."
),
)
subparser.add_argument(
_add_option_if_missing(
subparser,
"--all",
action="store_true",
default=False,
help=(
"Apply the subcommand to all repositories in the config. "
"Some subcommands ask for confirmation. If you want to give this "
"confirmation for all repositories, pipe 'yes'. E.g: "
"yes | pkgmgr {subcommand} --all"
"Pipe 'yes' to auto-confirm. Example:\n"
" yes | pkgmgr <command> --all"
),
)
subparser.add_argument(
_add_option_if_missing(
subparser,
"--category",
nargs="+",
default=[],
help=(
"Filter repositories by category patterns derived from config "
"filenames or repo metadata (use filename without .yml/.yaml, "
"or /regex/ to use a regular expression)."
),
help="Filter repositories by category (supports /regex/).",
)
subparser.add_argument(
_add_option_if_missing(
subparser,
"--string",
default="",
help=(
"Filter repositories whose identifier / name / path contains this "
"substring (case-insensitive). Use /regex/ for regular expressions."
),
help="Filter repositories by substring or /regex/.",
)
subparser.add_argument(
_add_option_if_missing(
subparser,
"--tag",
action="append",
default=[],
help=(
"Filter repositories by tag. Matches tags from the repository "
"collector and category tags. Use /regex/ for regular expressions."
),
help="Filter repositories by tag (supports /regex/).",
)
subparser.add_argument(
_add_option_if_missing(
subparser,
"--preview",
action="store_true",
help="Preview changes without executing commands",
help="Preview changes without executing commands.",
)
subparser.add_argument(
_add_option_if_missing(
subparser,
"--list",
action="store_true",
help="List affected repositories (with preview or status)",
help="List affected repositories.",
)
subparser.add_argument(
_add_option_if_missing(
subparser,
"-a",
"--args",
nargs=argparse.REMAINDER,
dest="extra_args",
help="Additional parameters to be attached.",
nargs=argparse.REMAINDER,
default=[],
help="Additional parameters to be attached.",
)
@@ -99,29 +137,34 @@ def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None:
Common arguments for install/update commands.
"""
add_identifier_arguments(subparser)
subparser.add_argument(
_add_option_if_missing(
subparser,
"-q",
"--quiet",
action="store_true",
help="Suppress warnings and info messages",
help="Suppress warnings and info messages.",
)
subparser.add_argument(
_add_option_if_missing(
subparser,
"--no-verification",
action="store_true",
default=False,
help="Disable verification via commit/gpg",
help="Disable verification via commit / GPG.",
)
subparser.add_argument(
_add_option_if_missing(
subparser,
"--dependencies",
action="store_true",
help="Also pull and update dependencies",
help="Also pull and update dependencies.",
)
subparser.add_argument(
_add_option_if_missing(
subparser,
"--clone-mode",
choices=["ssh", "https", "shallow"],
default="ssh",
help=(
"Specify the clone mode: ssh, https, or shallow "
"(HTTPS shallow clone; default: ssh)"
),
help="Specify clone mode (default: ssh).",
)

View File

@@ -1,11 +1,12 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
from .common import add_install_update_arguments, add_identifier_arguments
from pkgmgr.cli.parser.common import (
add_install_update_arguments,
add_identifier_arguments,
)
def add_install_update_subparsers(
@@ -14,11 +15,17 @@ def add_install_update_subparsers(
"""
Register install / update / deinstall / delete commands.
"""
install_parser = subparsers.add_parser(
"install",
help="Setup repository/repositories alias links to executables",
)
add_install_update_arguments(install_parser)
install_parser.add_argument(
"--update",
action="store_true",
help="Force re-run installers (upgrade/refresh) even if the CLI layer is already loaded",
)
update_parser = subparsers.add_parser(
"update",
@@ -27,9 +34,11 @@ def add_install_update_subparsers(
add_install_update_arguments(update_parser)
update_parser.add_argument(
"--system",
dest="system",
action="store_true",
help="Include system update commands",
)
# No --update here: update implies force_update=True
deinstall_parser = subparsers.add_parser(
"deinstall",

View File

@@ -1,3 +1,4 @@
# src/pkgmgr/cli/parser/mirror_cmd.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
@@ -8,103 +9,55 @@ import argparse
from .common import add_identifier_arguments
def add_mirror_subparsers(
subparsers: argparse._SubParsersAction,
) -> None:
"""
Register mirror command and its subcommands (list, diff, merge, setup).
"""
def add_mirror_subparsers(subparsers: argparse._SubParsersAction) -> None:
mirror_parser = subparsers.add_parser(
"mirror",
help="Mirror-related utilities (list, diff, merge, setup)",
help="Mirror-related utilities (list, diff, merge, setup, check, provision)",
)
mirror_subparsers = mirror_parser.add_subparsers(
dest="subcommand",
help="Mirror subcommands",
metavar="SUBCOMMAND",
required=True,
)
# ------------------------------------------------------------------
# mirror list
# ------------------------------------------------------------------
mirror_list = mirror_subparsers.add_parser(
"list",
help="List configured mirrors for repositories",
)
mirror_list = mirror_subparsers.add_parser("list", help="List configured mirrors for repositories")
add_identifier_arguments(mirror_list)
mirror_list.add_argument(
"--source",
choices=["all", "config", "file", "resolved"],
choices=["config", "file", "all"],
default="all",
help="Which mirror source to show.",
)
# ------------------------------------------------------------------
# mirror diff
# ------------------------------------------------------------------
mirror_diff = mirror_subparsers.add_parser(
"diff",
help="Show differences between config mirrors and MIRRORS file",
)
mirror_diff = mirror_subparsers.add_parser("diff", help="Show differences between config mirrors and MIRRORS file")
add_identifier_arguments(mirror_diff)
# ------------------------------------------------------------------
# mirror merge {config,file} {config,file}
# ------------------------------------------------------------------
mirror_merge = mirror_subparsers.add_parser(
"merge",
help=(
"Merge mirrors between config and MIRRORS file "
"(example: pkgmgr mirror merge config file --all)"
),
help="Merge mirrors between config and MIRRORS file (example: pkgmgr mirror merge config file --all)",
)
# First define merge direction positionals, then selection args.
mirror_merge.add_argument(
"source",
choices=["config", "file"],
help="Source of mirrors.",
)
mirror_merge.add_argument(
"target",
choices=["config", "file"],
help="Target of mirrors.",
)
# Selection / filter / preview arguments
mirror_merge.add_argument("source", choices=["config", "file"], help="Source of mirrors.")
mirror_merge.add_argument("target", choices=["config", "file"], help="Target of mirrors.")
add_identifier_arguments(mirror_merge)
mirror_merge.add_argument(
"--config-path",
help=(
"Path to the user config file to update. "
"If omitted, the global config path is used."
),
help="Path to the user config file to update. If omitted, the global config path is used.",
)
# Note: --preview, --all, --category, --tag, --list, etc. are provided
# by add_identifier_arguments().
# ------------------------------------------------------------------
# mirror setup
# ------------------------------------------------------------------
mirror_setup = mirror_subparsers.add_parser(
"setup",
help=(
"Setup mirror configuration for repositories.\n"
" --local → configure local Git (remotes, pushurls)\n"
" --remote → create remote repositories if missing\n"
"Default: both local and remote."
),
help="Configure local Git remotes and push URLs (origin, pushurl list).",
)
add_identifier_arguments(mirror_setup)
mirror_setup.add_argument(
"--local",
action="store_true",
help="Only configure the local Git repository.",
mirror_check = mirror_subparsers.add_parser(
"check",
help="Check remote mirror reachability (git ls-remote). Read-only.",
)
mirror_setup.add_argument(
"--remote",
action="store_true",
help="Only operate on remote repositories.",
add_identifier_arguments(mirror_check)
mirror_provision = mirror_subparsers.add_parser(
"provision",
help="Provision remote repositories via provider APIs (create missing repos).",
)
# Note: --preview also comes from add_identifier_arguments().
add_identifier_arguments(mirror_provision)

View File

@@ -0,0 +1,30 @@
# src/pkgmgr/core/command/layer.py
from __future__ import annotations
from enum import Enum
class CliLayer(str, Enum):
"""
CLI layer precedence (lower number = stronger layer).
"""
OS_PACKAGES = "os-packages"
NIX = "nix"
PYTHON = "python"
MAKEFILE = "makefile"
_LAYER_PRIORITY: dict[CliLayer, int] = {
CliLayer.OS_PACKAGES: 0,
CliLayer.NIX: 1,
CliLayer.PYTHON: 2,
CliLayer.MAKEFILE: 3,
}
def layer_priority(layer: CliLayer) -> int:
"""
Return precedence priority for the given layer.
Lower value means higher priority (stronger layer).
"""
return _LAYER_PRIORITY.get(layer, 999)

View File

@@ -0,0 +1,21 @@
# src/pkgmgr/core/credentials/__init__.py
"""Credential resolution for provider APIs."""
from .resolver import ResolutionOptions, TokenResolver
from .types import (
CredentialError,
KeyringUnavailableError,
NoCredentialsError,
TokenRequest,
TokenResult,
)
__all__ = [
"TokenResolver",
"ResolutionOptions",
"CredentialError",
"NoCredentialsError",
"KeyringUnavailableError",
"TokenRequest",
"TokenResult",
]

View File

@@ -0,0 +1,11 @@
"""Credential providers used by TokenResolver."""
from .env import EnvTokenProvider
from .keyring import KeyringTokenProvider
from .prompt import PromptTokenProvider
__all__ = [
"EnvTokenProvider",
"KeyringTokenProvider",
"PromptTokenProvider",
]

View File

@@ -0,0 +1,23 @@
# src/pkgmgr/core/credentials/providers/env.py
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Optional
from ..store_keys import env_var_candidates
from ..types import TokenRequest, TokenResult
@dataclass(frozen=True)
class EnvTokenProvider:
"""Resolve tokens from environment variables."""
source_name: str = "env"
def get(self, request: TokenRequest) -> Optional[TokenResult]:
for key in env_var_candidates(request.provider_kind, request.host, request.owner):
val = os.environ.get(key)
if val:
return TokenResult(token=val.strip(), source=self.source_name)
return None

View File

@@ -0,0 +1,39 @@
# src/pkgmgr/core/credentials/providers/keyring.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from ..store_keys import build_keyring_key
from ..types import KeyringUnavailableError, TokenRequest, TokenResult
def _import_keyring():
try:
import keyring # type: ignore
return keyring
except Exception as exc: # noqa: BLE001
raise KeyringUnavailableError(
"python-keyring is not available or no backend is configured."
) from exc
@dataclass(frozen=True)
class KeyringTokenProvider:
"""Resolve/store tokens from/to OS keyring via python-keyring."""
source_name: str = "keyring"
def get(self, request: TokenRequest) -> Optional[TokenResult]:
keyring = _import_keyring()
key = build_keyring_key(request.provider_kind, request.host, request.owner)
token = keyring.get_password(key.service, key.username)
if token:
return TokenResult(token=token.strip(), source=self.source_name)
return None
def set(self, request: TokenRequest, token: str) -> None:
keyring = _import_keyring()
key = build_keyring_key(request.provider_kind, request.host, request.owner)
keyring.set_password(key.service, key.username, token)

View File

@@ -0,0 +1,32 @@
# src/pkgmgr/core/credentials/providers/prompt.py
from __future__ import annotations
import sys
from dataclasses import dataclass
from getpass import getpass
from typing import Optional
from ..types import TokenRequest, TokenResult
@dataclass(frozen=True)
class PromptTokenProvider:
"""Interactively prompt for a token.
Only used when:
- interactive mode is enabled
- stdin is a TTY
"""
source_name: str = "prompt"
def get(self, request: TokenRequest) -> Optional[TokenResult]:
if not sys.stdin.isatty():
return None
owner_info = f" (owner: {request.owner})" if request.owner else ""
prompt = f"Enter API token for {request.provider_kind} on {request.host}{owner_info}: "
token = (getpass(prompt) or "").strip()
if not token:
return None
return TokenResult(token=token, source=self.source_name)

View File

@@ -0,0 +1,71 @@
# src/pkgmgr/core/credentials/resolver.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from .providers.env import EnvTokenProvider
from .providers.keyring import KeyringTokenProvider
from .providers.prompt import PromptTokenProvider
from .types import NoCredentialsError, TokenRequest, TokenResult
@dataclass(frozen=True)
class ResolutionOptions:
"""Controls token resolution behavior."""
interactive: bool = True
allow_prompt: bool = True
save_prompt_token_to_keyring: bool = True
class TokenResolver:
"""Resolve tokens from multiple sources (ENV -> Keyring -> Prompt)."""
def __init__(self) -> None:
self._env = EnvTokenProvider()
self._keyring = KeyringTokenProvider()
self._prompt = PromptTokenProvider()
def get_token(
self,
provider_kind: str,
host: str,
owner: Optional[str] = None,
options: Optional[ResolutionOptions] = None,
) -> TokenResult:
opts = options or ResolutionOptions()
request = TokenRequest(provider_kind=provider_kind, host=host, owner=owner)
# 1) ENV
env_res = self._env.get(request)
if env_res:
return env_res
# 2) Keyring
try:
kr_res = self._keyring.get(request)
if kr_res:
return kr_res
except Exception:
# Keyring missing/unavailable: ignore to allow prompt (workstations)
# or to fail cleanly below (headless CI without prompt).
pass
# 3) Prompt (optional)
if opts.interactive and opts.allow_prompt:
prompt_res = self._prompt.get(request)
if prompt_res:
if opts.save_prompt_token_to_keyring:
try:
self._keyring.set(request, prompt_res.token)
except Exception:
# If keyring cannot store, still use token for this run.
pass
return prompt_res
raise NoCredentialsError(
f"No token available for {provider_kind}@{host}"
+ (f" (owner: {owner})" if owner else "")
+ ". Provide it via environment variable or keyring."
)

View File

@@ -0,0 +1,54 @@
# src/pkgmgr/core/credentials/store_keys.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class KeyringKey:
"""Keyring address for a token."""
service: str
username: str
def build_keyring_key(provider_kind: str, host: str, owner: Optional[str]) -> KeyringKey:
"""Build a stable keyring key.
- service: "pkgmgr:<provider>"
- username: "<host>|<owner>" or "<host>|-"
"""
provider_kind = str(provider_kind).strip().lower()
host = str(host).strip()
owner_part = (str(owner).strip() if owner else "-")
return KeyringKey(service=f"pkgmgr:{provider_kind}", username=f"{host}|{owner_part}")
def env_var_candidates(provider_kind: str, host: str, owner: Optional[str]) -> list[str]:
"""Return a list of environment variable names to try.
Order is from most specific to most generic.
"""
kind = re_sub_non_alnum(str(provider_kind).strip().upper())
host_norm = re_sub_non_alnum(str(host).strip().upper())
candidates: list[str] = []
if owner:
owner_norm = re_sub_non_alnum(str(owner).strip().upper())
candidates.append(f"PKGMGR_{kind}_TOKEN_{host_norm}_{owner_norm}")
candidates.append(f"PKGMGR_TOKEN_{kind}_{host_norm}_{owner_norm}")
candidates.append(f"PKGMGR_{kind}_TOKEN_{host_norm}")
candidates.append(f"PKGMGR_TOKEN_{kind}_{host_norm}")
candidates.append(f"PKGMGR_{kind}_TOKEN")
candidates.append(f"PKGMGR_TOKEN_{kind}")
candidates.append("PKGMGR_TOKEN")
return candidates
def re_sub_non_alnum(value: str) -> str:
"""Normalize to an uppercase env-var friendly token (A-Z0-9_)."""
import re
return re.sub(r"[^A-Z0-9]+", "_", value).strip("_")

View File

@@ -0,0 +1,34 @@
# src/pkgmgr/core/credentials/types.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
class CredentialError(RuntimeError):
"""Base class for credential resolution errors."""
class NoCredentialsError(CredentialError):
"""Raised when no usable credential could be resolved."""
class KeyringUnavailableError(CredentialError):
"""Raised when keyring is requested but no backend is available."""
@dataclass(frozen=True)
class TokenRequest:
"""Parameters describing which token we need."""
provider_kind: str # e.g. "gitea", "github"
host: str # e.g. "git.example.org" or "github.com"
owner: Optional[str] = None # optional org/user
@dataclass(frozen=True)
class TokenResult:
"""A resolved token plus metadata about its source."""
token: str
source: str # "env" | "keyring" | "prompt"

View File

@@ -0,0 +1,14 @@
# src/pkgmgr/core/remote_provisioning/__init__.py
"""Remote repository provisioning (ensure remote repo exists)."""
from .ensure import ensure_remote_repo
from .registry import ProviderRegistry
from .types import EnsureResult, ProviderHint, RepoSpec
__all__ = [
"ensure_remote_repo",
"RepoSpec",
"EnsureResult",
"ProviderHint",
"ProviderRegistry",
]

View File

@@ -0,0 +1,97 @@
# src/pkgmgr/core/remote_provisioning/ensure.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from pkgmgr.core.credentials.resolver import ResolutionOptions, TokenResolver
from .http.errors import HttpError
from .registry import ProviderRegistry
from .types import (
AuthError,
EnsureResult,
NetworkError,
PermissionError,
ProviderHint,
RepoSpec,
UnsupportedProviderError,
)
@dataclass(frozen=True)
class EnsureOptions:
"""Options controlling remote provisioning."""
preview: bool = False
interactive: bool = True
allow_prompt: bool = True
save_prompt_token_to_keyring: bool = True
def _raise_mapped_http_error(exc: HttpError, host: str) -> None:
"""Map HttpError into domain-specific error types."""
if exc.status == 0:
raise NetworkError(f"Network error while talking to {host}: {exc}") from exc
if exc.status == 401:
raise AuthError(f"Authentication failed for {host} (401).") from exc
if exc.status == 403:
raise PermissionError(f"Permission denied for {host} (403).") from exc
raise NetworkError(
f"HTTP error from {host}: status={exc.status}, message={exc}, body={exc.body}"
) from exc
def ensure_remote_repo(
spec: RepoSpec,
provider_hint: Optional[ProviderHint] = None,
options: Optional[EnsureOptions] = None,
registry: Optional[ProviderRegistry] = None,
token_resolver: Optional[TokenResolver] = None,
) -> EnsureResult:
"""Ensure that the remote repository exists (create if missing).
- Uses TokenResolver (ENV -> keyring -> prompt)
- Selects provider via ProviderRegistry (or provider_hint override)
- Respects preview mode (no remote changes)
- Maps HTTP errors to domain-specific errors
"""
opts = options or EnsureOptions()
reg = registry or ProviderRegistry.default()
resolver = token_resolver or TokenResolver()
provider = reg.resolve(spec.host)
if provider_hint and provider_hint.kind:
forced = provider_hint.kind.strip().lower()
provider = next(
(p for p in reg.providers if getattr(p, "kind", "").lower() == forced),
None,
)
if provider is None:
raise UnsupportedProviderError(f"No provider matched host: {spec.host}")
token_opts = ResolutionOptions(
interactive=opts.interactive,
allow_prompt=opts.allow_prompt,
save_prompt_token_to_keyring=opts.save_prompt_token_to_keyring,
)
token = resolver.get_token(
provider_kind=getattr(provider, "kind", "unknown"),
host=spec.host,
owner=spec.owner,
options=token_opts,
)
if opts.preview:
return EnsureResult(
status="skipped",
message="Preview mode: no remote changes performed.",
)
try:
return provider.ensure_repo(token.token, spec)
except HttpError as exc:
_raise_mapped_http_error(exc, host=spec.host)
return EnsureResult(status="failed", message="Unreachable error mapping.")

View File

@@ -0,0 +1,5 @@
# src/pkgmgr/core/remote_provisioning/http/__init__.py
from .client import HttpClient, HttpResponse
from .errors import HttpError
__all__ = ["HttpClient", "HttpResponse", "HttpError"]

View File

@@ -0,0 +1,69 @@
# src/pkgmgr/core/remote_provisioning/http/client.py
from __future__ import annotations
import json
import ssl
import urllib.error
import urllib.request
from dataclasses import dataclass
from typing import Any, Dict, Optional
from .errors import HttpError
@dataclass(frozen=True)
class HttpResponse:
status: int
text: str
json: Optional[Dict[str, Any]] = None
class HttpClient:
"""Tiny HTTP client (stdlib) with JSON support."""
def __init__(self, timeout_s: int = 15) -> None:
self._timeout_s = int(timeout_s)
def request_json(
self,
method: str,
url: str,
headers: Optional[Dict[str, str]] = None,
payload: Optional[Dict[str, Any]] = None,
) -> HttpResponse:
data: Optional[bytes] = None
final_headers: Dict[str, str] = dict(headers or {})
if payload is not None:
data = json.dumps(payload).encode("utf-8")
final_headers.setdefault("Content-Type", "application/json")
req = urllib.request.Request(url=url, data=data, method=method.upper())
for k, v in final_headers.items():
req.add_header(k, v)
try:
with urllib.request.urlopen(
req,
timeout=self._timeout_s,
context=ssl.create_default_context(),
) as resp:
raw = resp.read().decode("utf-8", errors="replace")
parsed: Optional[Dict[str, Any]] = None
if raw:
try:
loaded = json.loads(raw)
parsed = loaded if isinstance(loaded, dict) else None
except Exception:
parsed = None
return HttpResponse(status=int(resp.status), text=raw, json=parsed)
except urllib.error.HTTPError as exc:
try:
body = exc.read().decode("utf-8", errors="replace")
except Exception:
body = ""
raise HttpError(status=int(exc.code), message=str(exc), body=body) from exc
except urllib.error.URLError as exc:
raise HttpError(status=0, message=str(exc), body="") from exc

View File

@@ -0,0 +1,9 @@
# src/pkgmgr/core/remote_provisioning/http/errors.py
from __future__ import annotations
class HttpError(RuntimeError):
def __init__(self, status: int, message: str, body: str = "") -> None:
super().__init__(message)
self.status = status
self.body = body

View File

@@ -0,0 +1,6 @@
# src/pkgmgr/core/remote_provisioning/providers/__init__.py
from .base import RemoteProvider
from .gitea import GiteaProvider
from .github import GitHubProvider
__all__ = ["RemoteProvider", "GiteaProvider", "GitHubProvider"]

View File

@@ -0,0 +1,36 @@
# src/pkgmgr/core/remote_provisioning/providers/base.py
from __future__ import annotations
from abc import ABC, abstractmethod
from ..types import EnsureResult, RepoSpec
class RemoteProvider(ABC):
"""Provider interface for remote repo provisioning."""
kind: str
@abstractmethod
def can_handle(self, host: str) -> bool:
"""Return True if this provider implementation matches the host."""
@abstractmethod
def repo_exists(self, token: str, spec: RepoSpec) -> bool:
"""Return True if repo exists and is accessible."""
@abstractmethod
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
"""Create a repository (owner may be user or org)."""
def ensure_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
if self.repo_exists(token, spec):
return EnsureResult(status="exists", message="Repository exists.")
return self.create_repo(token, spec)
@staticmethod
def _api_base(host: str) -> str:
# Default to https. If you need http for local dev, store host as "http://..."
if host.startswith("http://") or host.startswith("https://"):
return host.rstrip("/")
return f"https://{host}".rstrip("/")

View File

@@ -0,0 +1,106 @@
# src/pkgmgr/core/remote_provisioning/providers/gitea.py
from __future__ import annotations
from typing import Any, Dict
from ..http.client import HttpClient
from ..http.errors import HttpError
from ..types import EnsureResult, RepoSpec
from .base import RemoteProvider
class GiteaProvider(RemoteProvider):
"""Gitea provider using Gitea REST API v1."""
kind = "gitea"
def __init__(self, timeout_s: int = 15) -> None:
self._http = HttpClient(timeout_s=timeout_s)
def can_handle(self, host: str) -> bool:
"""
Heuristic host match:
- Acts as a fallback provider for self-hosted setups.
- Must NOT claim GitHub hosts.
- If you add more providers later, tighten this heuristic or use provider hints.
"""
h = host.lower()
if h in ("github.com", "api.github.com") or h.endswith(".github.com"):
return False
return True
def _headers(self, token: str) -> Dict[str, str]:
"""
Gitea commonly supports:
Authorization: token <TOKEN>
Newer versions may also accept Bearer tokens, but "token" is broadly compatible.
"""
return {
"Authorization": f"token {token}",
"Accept": "application/json",
"User-Agent": "pkgmgr",
}
def repo_exists(self, token: str, spec: RepoSpec) -> bool:
base = self._api_base(spec.host)
url = f"{base}/api/v1/repos/{spec.owner}/{spec.name}"
try:
resp = self._http.request_json("GET", url, headers=self._headers(token))
return 200 <= resp.status < 300
except HttpError as exc:
if exc.status == 404:
return False
raise
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
base = self._api_base(spec.host)
payload: Dict[str, Any] = {
"name": spec.name,
"private": bool(spec.private),
}
if spec.description:
payload["description"] = spec.description
if spec.default_branch:
payload["default_branch"] = spec.default_branch
org_url = f"{base}/api/v1/orgs/{spec.owner}/repos"
user_url = f"{base}/api/v1/user/repos"
# Try org first, then fall back to user creation.
try:
resp = self._http.request_json(
"POST",
org_url,
headers=self._headers(token),
payload=payload,
)
if 200 <= resp.status < 300:
html_url = (resp.json or {}).get("html_url") if resp.json else None
return EnsureResult(
status="created",
message="Repository created (org).",
url=str(html_url) if html_url else None,
)
except HttpError:
# Typical org failures: 404 (not an org), 403 (no rights), 401 (bad token).
pass
resp = self._http.request_json(
"POST",
user_url,
headers=self._headers(token),
payload=payload,
)
if 200 <= resp.status < 300:
html_url = (resp.json or {}).get("html_url") if resp.json else None
return EnsureResult(
status="created",
message="Repository created (user).",
url=str(html_url) if html_url else None,
)
return EnsureResult(
status="failed",
message=f"Failed to create repository (status {resp.status}).",
)

View File

@@ -0,0 +1,101 @@
# src/pkgmgr/core/remote_provisioning/providers/github.py
from __future__ import annotations
from typing import Any, Dict
from ..http.client import HttpClient
from ..http.errors import HttpError
from ..types import EnsureResult, RepoSpec
from .base import RemoteProvider
class GitHubProvider(RemoteProvider):
"""GitHub provider using GitHub REST API."""
kind = "github"
def __init__(self, timeout_s: int = 15) -> None:
self._http = HttpClient(timeout_s=timeout_s)
def can_handle(self, host: str) -> bool:
h = host.lower()
return h in ("github.com", "api.github.com") or h.endswith(".github.com")
def _api_base(self, host: str) -> str:
"""
GitHub API base:
- Public GitHub: https://api.github.com
- GitHub Enterprise Server: https://<host>/api/v3
"""
h = host.lower()
if h in ("github.com", "api.github.com"):
return "https://api.github.com"
# Enterprise instance:
if host.startswith("http://") or host.startswith("https://"):
return host.rstrip("/") + "/api/v3"
return f"https://{host}/api/v3"
def _headers(self, token: str) -> Dict[str, str]:
return {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github+json",
"User-Agent": "pkgmgr",
}
def repo_exists(self, token: str, spec: RepoSpec) -> bool:
api = self._api_base(spec.host)
url = f"{api}/repos/{spec.owner}/{spec.name}"
try:
resp = self._http.request_json("GET", url, headers=self._headers(token))
return 200 <= resp.status < 300
except HttpError as exc:
if exc.status == 404:
return False
raise
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
api = self._api_base(spec.host)
payload: Dict[str, Any] = {
"name": spec.name,
"private": bool(spec.private),
}
if spec.description:
payload["description"] = spec.description
if spec.default_branch:
payload["default_branch"] = spec.default_branch
org_url = f"{api}/orgs/{spec.owner}/repos"
user_url = f"{api}/user/repos"
# Try org first, then fall back to user creation.
try:
resp = self._http.request_json(
"POST", org_url, headers=self._headers(token), payload=payload
)
if 200 <= resp.status < 300:
html_url = (resp.json or {}).get("html_url") if resp.json else None
return EnsureResult(
status="created",
message="Repository created (org).",
url=str(html_url) if html_url else None,
)
except HttpError:
pass
resp = self._http.request_json(
"POST", user_url, headers=self._headers(token), payload=payload
)
if 200 <= resp.status < 300:
html_url = (resp.json or {}).get("html_url") if resp.json else None
return EnsureResult(
status="created",
message="Repository created (user).",
url=str(html_url) if html_url else None,
)
return EnsureResult(
status="failed",
message=f"Failed to create repository (status {resp.status}).",
)

View File

@@ -0,0 +1,30 @@
# src/pkgmgr/core/remote_provisioning/registry.py
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional
from .providers.base import RemoteProvider
from .providers.gitea import GiteaProvider
from .providers.github import GitHubProvider
@dataclass
class ProviderRegistry:
"""Resolve the correct provider implementation for a host."""
providers: List[RemoteProvider]
@classmethod
def default(cls) -> "ProviderRegistry":
# Order matters: more specific providers first; fallback providers last.
return cls(providers=[GitHubProvider(), GiteaProvider()])
def resolve(self, host: str) -> Optional[RemoteProvider]:
for p in self.providers:
try:
if p.can_handle(host):
return p
except Exception:
continue
return None

View File

@@ -0,0 +1,61 @@
# src/pkgmgr/core/remote_provisioning/types.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, Optional
EnsureStatus = Literal["exists", "created", "skipped", "failed"]
@dataclass(frozen=True)
class ProviderHint:
"""Optional hint to force a provider kind."""
kind: Optional[str] = None # e.g. "gitea" or "github"
@dataclass(frozen=True)
class RepoSpec:
"""Desired remote repository."""
host: str
owner: str
name: str
private: bool = True
description: str = ""
default_branch: Optional[str] = None
@dataclass(frozen=True)
class EnsureResult:
status: EnsureStatus
message: str
url: Optional[str] = None
class RemoteProvisioningError(RuntimeError):
"""Base class for remote provisioning errors."""
class AuthError(RemoteProvisioningError):
"""Authentication failed (401)."""
class PermissionError(RemoteProvisioningError):
"""Permission denied (403)."""
class NotFoundError(RemoteProvisioningError):
"""Resource not found (404)."""
class PolicyError(RemoteProvisioningError):
"""Provider/org policy prevents the operation."""
class NetworkError(RemoteProvisioningError):
"""Network/transport errors."""
class UnsupportedProviderError(RemoteProvisioningError):
"""No provider matched for the given host."""

24
tests/e2e/_util.py Normal file
View File

@@ -0,0 +1,24 @@
import subprocess
def run(cmd, *, cwd=None, env=None, shell=False) -> str:
proc = subprocess.run(
cmd,
cwd=cwd,
env=env,
shell=shell,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
print("----- BEGIN COMMAND -----")
print(cmd if isinstance(cmd, str) else " ".join(cmd))
print("----- OUTPUT -----")
print(proc.stdout.rstrip())
print("----- END COMMAND -----")
if proc.returncode != 0:
raise AssertionError(proc.stdout)
return proc.stdout

View File

@@ -0,0 +1,25 @@
from tests.e2e._util import run
import tempfile
import unittest
from pathlib import Path
class TestMakefileThreeTimes(unittest.TestCase):
def test_make_install_three_times(self):
with tempfile.TemporaryDirectory(prefix="makefile-3x-") as tmp:
repo = Path(tmp)
# Minimal Makefile with install target
(repo / "Makefile").write_text(
"install:\n\t@echo install >> install.log\n"
)
for i in range(1, 4):
print(f"\n=== RUN {i}/3 ===")
run(["make", "install"], cwd=repo)
log = (repo / "install.log").read_text().splitlines()
self.assertEqual(
len(log),
3,
"make install should have been executed exactly three times",
)

View File

@@ -0,0 +1,37 @@
import os
from tests.e2e._util import run
import tempfile
import unittest
from pathlib import Path
class TestPkgmgrInstallThreeTimesNix(unittest.TestCase):
def test_three_times_install_nix(self):
with tempfile.TemporaryDirectory(prefix="pkgmgr-nix-") as tmp:
tmp_path = Path(tmp)
env = os.environ.copy()
env["HOME"] = tmp
# Ensure nix is found
env["PATH"] = "/nix/var/nix/profiles/default/bin:" + os.environ.get("PATH", "")
# IMPORTANT:
# nix run uses git+file:///src internally -> Git will reject /src if it's not a safe.directory.
# Our test sets HOME to a temp dir, so we must provide a temp global gitconfig.
gitconfig = tmp_path / ".gitconfig"
gitconfig.write_text(
"[safe]\n"
"\tdirectory = /src\n"
"\tdirectory = /src/.git\n"
"\tdirectory = *\n"
)
env["GIT_CONFIG_GLOBAL"] = str(gitconfig)
for i in range(1, 4):
print(f"\n=== RUN {i}/3 ===")
run(
"nix run .#pkgmgr -- install pkgmgr --update --clone-mode shallow --no-verification",
env=env,
shell=True,
)

View File

@@ -0,0 +1,34 @@
from tests.e2e._util import run
import tempfile
import unittest
from pathlib import Path
import os
class TestPkgmgrInstallThreeTimesVenv(unittest.TestCase):
def test_three_times_install_venv(self):
with tempfile.TemporaryDirectory(prefix="pkgmgr-venv-") as tmp:
home = Path(tmp)
bin_dir = home / ".local" / "bin"
bin_dir.mkdir(parents=True)
env = os.environ.copy()
env["HOME"] = tmp
# pkgmgr kommt aus dem Projekt-venv
env["PATH"] = (
f"{Path.cwd() / '.venv' / 'bin'}:"
f"{bin_dir}:"
+ os.environ.get("PATH", "")
)
# nix explizit deaktivieren → Python/Venv-Pfad
env["PKGMGR_DISABLE_NIX_FLAKE_INSTALLER"] = "1"
for i in range(1, 4):
print(f"\n=== RUN {i}/3 ===")
run(
"pkgmgr install pkgmgr --update --clone-mode shallow --no-verification",
env=env,
shell=True,
)

View File

@@ -4,21 +4,21 @@
"""
E2E integration tests for the `pkgmgr mirror` command family.
This test class covers:
Covered commands:
- pkgmgr mirror --help
- pkgmgr mirror list --preview --all
- pkgmgr mirror diff --preview --all
- pkgmgr mirror merge config file --preview --all
- pkgmgr mirror setup --preview --all
- pkgmgr mirror check --preview --all
- pkgmgr mirror provision --preview --all
All of these subcommands are fully wired at CLI level and do not
require mocks. With --preview, merge and setup do not perform
destructive actions, making them safe for CI execution.
All commands are executed via the real CLI entry point (main module).
With --preview enabled, all operations are non-destructive and safe
to run inside CI containers.
"""
from __future__ import annotations
import io
import runpy
import sys
@@ -28,25 +28,25 @@ from contextlib import redirect_stdout, redirect_stderr
class TestIntegrationMirrorCommands(unittest.TestCase):
"""
E2E tests for `pkgmgr mirror` commands.
End-to-end tests for `pkgmgr mirror` commands.
"""
# ------------------------------------------------------------
# Helper
# ------------------------------------------------------------
def _run_pkgmgr(self, args: list[str]) -> str:
def _run_pkgmgr(self, args):
"""
Execute pkgmgr with the given arguments and return captured stdout+stderr.
Execute pkgmgr with the given arguments and return captured output.
- Treat SystemExit(0) or SystemExit(None) as success.
- Convert non-zero exit codes into AssertionError.
- Any other exit code is considered a test failure.
"""
original_argv = list(sys.argv)
buffer = io.StringIO()
cmd_repr = "pkgmgr " + " ".join(args)
try:
sys.argv = ["pkgmgr"] + args
sys.argv = ["pkgmgr"] + list(args)
try:
with redirect_stdout(buffer), redirect_stderr(buffer):
@@ -55,9 +55,9 @@ class TestIntegrationMirrorCommands(unittest.TestCase):
code = exc.code if isinstance(exc.code, int) else None
if code not in (0, None):
raise AssertionError(
f"{cmd_repr!r} failed with exit code {exc.code}. "
"Scroll up to inspect the pkgmgr output."
) from exc
"%r failed with exit code %r.\n\nOutput:\n%s"
% (cmd_repr, exc.code, buffer.getvalue())
)
return buffer.getvalue()
@@ -68,44 +68,41 @@ class TestIntegrationMirrorCommands(unittest.TestCase):
# Tests
# ------------------------------------------------------------
def test_mirror_help(self) -> None:
def test_mirror_help(self):
"""
Ensure `pkgmgr mirror --help` runs successfully
and prints a usage message for the mirror command.
`pkgmgr mirror --help` should run without error and print usage info.
"""
output = self._run_pkgmgr(["mirror", "--help"])
self.assertIn("usage:", output)
self.assertIn("pkgmgr mirror", output)
def test_mirror_list_preview_all(self) -> None:
def test_mirror_list_preview_all(self):
"""
`pkgmgr mirror list --preview --all` should run without error
and produce some output for the selected repositories.
`pkgmgr mirror list --preview --all`
"""
output = self._run_pkgmgr(["mirror", "list", "--preview", "--all"])
# Do not assert specific wording; just ensure something was printed.
output = self._run_pkgmgr(
["mirror", "list", "--preview", "--all"]
)
self.assertTrue(
output.strip(),
msg="Expected `pkgmgr mirror list --preview --all` to produce output.",
"Expected output from mirror list",
)
def test_mirror_diff_preview_all(self) -> None:
def test_mirror_diff_preview_all(self):
"""
`pkgmgr mirror diff --preview --all` should run without error
and produce some diagnostic output (diff header, etc.).
`pkgmgr mirror diff --preview --all`
"""
output = self._run_pkgmgr(["mirror", "diff", "--preview", "--all"])
output = self._run_pkgmgr(
["mirror", "diff", "--preview", "--all"]
)
self.assertTrue(
output.strip(),
msg="Expected `pkgmgr mirror diff --preview --all` to produce output.",
"Expected output from mirror diff",
)
def test_mirror_merge_config_to_file_preview_all(self) -> None:
def test_mirror_merge_config_to_file_preview_all(self):
"""
`pkgmgr mirror merge config file --preview --all` should run without error.
In preview mode this does not change either config or MIRRORS files;
it only prints what would be merged.
`pkgmgr mirror merge config file --preview --all`
"""
output = self._run_pkgmgr(
[
@@ -119,23 +116,47 @@ class TestIntegrationMirrorCommands(unittest.TestCase):
)
self.assertTrue(
output.strip(),
msg=(
"Expected `pkgmgr mirror merge config file --preview --all` "
"to produce output."
),
"Expected output from mirror merge (config -> file)",
)
def test_mirror_setup_preview_all(self) -> None:
def test_mirror_setup_preview_all(self):
"""
`pkgmgr mirror setup --preview --all` should run without error.
In preview mode only the intended Git operations and remote
suggestions are printed; no real changes are made.
`pkgmgr mirror setup --preview --all`
"""
output = self._run_pkgmgr(["mirror", "setup", "--preview", "--all"])
output = self._run_pkgmgr(
["mirror", "setup", "--preview", "--all"]
)
self.assertTrue(
output.strip(),
msg="Expected `pkgmgr mirror setup --preview --all` to produce output.",
"Expected output from mirror setup",
)
def test_mirror_check_preview_all(self):
"""
`pkgmgr mirror check --preview --all`
Performs non-destructive remote checks (git ls-remote).
"""
output = self._run_pkgmgr(
["mirror", "check", "--preview", "--all"]
)
self.assertTrue(
output.strip(),
"Expected output from mirror check",
)
def test_mirror_provision_preview_all(self):
"""
`pkgmgr mirror provision --preview --all`
In preview mode this MUST NOT create remote repositories.
"""
output = self._run_pkgmgr(
["mirror", "provision", "--preview", "--all"]
)
self.assertTrue(
output.strip(),
"Expected output from mirror provision (preview)",
)

View File

@@ -1,92 +0,0 @@
"""
Integration test: update all configured repositories using
--clone-mode https and --no-verification.
This test is intended to be run inside the Docker container where:
- network access is available,
- the config/config.yaml is present,
- and it is safe to perform real git operations.
It passes if BOTH commands complete successfully (in separate tests):
1) pkgmgr update --all --clone-mode https --no-verification
2) nix run .#pkgmgr -- update --all --clone-mode https --no-verification
"""
import os
import subprocess
import unittest
from test_install_pkgmgr_shallow import (
nix_profile_list_debug,
remove_pkgmgr_from_nix_profile,
pkgmgr_help_debug,
)
class TestIntegrationUpdateAllHttps(unittest.TestCase):
def _run_cmd(self, cmd: list[str], label: str) -> None:
"""
Run a real CLI command and raise a helpful assertion on failure.
"""
cmd_repr = " ".join(cmd)
env = os.environ.copy()
try:
print(f"\n[TEST] Running ({label}): {cmd_repr}")
subprocess.run(
cmd,
check=True,
cwd=os.getcwd(),
env=env,
text=True,
)
except subprocess.CalledProcessError as exc:
print(f"\n[TEST] Command failed ({label})")
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Exit code: {exc.returncode}")
nix_profile_list_debug(f"ON FAILURE ({label})")
raise AssertionError(
f"({label}) {cmd_repr!r} failed with exit code {exc.returncode}. "
"Scroll up to see the full pkgmgr/nix output inside the container."
) from exc
def _common_setup(self) -> None:
# Debug before cleanup
nix_profile_list_debug("BEFORE CLEANUP")
# Cleanup: aggressively try to drop any pkgmgr/profile entries
# (keeps the environment comparable to other integration tests).
remove_pkgmgr_from_nix_profile()
# Debug after cleanup
nix_profile_list_debug("AFTER CLEANUP")
def test_update_all_repositories_https_pkgmgr(self) -> None:
"""
Run: pkgmgr update --all --clone-mode https --no-verification
"""
self._common_setup()
args = ["update", "--all", "--clone-mode", "https", "--no-verification"]
self._run_cmd(["pkgmgr", *args], label="pkgmgr")
# After successful update: show `pkgmgr --help` via interactive bash
pkgmgr_help_debug()
def test_update_all_repositories_https_nix_pkgmgr(self) -> None:
"""
Run: nix run .#pkgmgr -- update --all --clone-mode https --no-verification
"""
self._common_setup()
args = ["update", "--all", "--clone-mode", "https", "--no-verification"]
self._run_cmd(["nix", "run", ".#pkgmgr", "--", *args], label="nix run .#pkgmgr")
# After successful update: show `pkgmgr --help` via interactive bash
pkgmgr_help_debug()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,123 @@
"""
Integration test: update all configured repositories using
--clone-mode shallow and --no-verification, WITHOUT system updates.
This test is intended to be run inside the Docker container where:
- network access is available,
- the config/config.yaml is present,
- and it is safe to perform real git operations.
It passes if BOTH commands complete successfully (in separate tests):
1) pkgmgr update --all --clone-mode shallow --no-verification
2) nix run .#pkgmgr -- update --all --clone-mode shallow --no-verification
"""
from __future__ import annotations
import os
import subprocess
import tempfile
import unittest
from pathlib import Path
from test_install_pkgmgr_shallow import (
nix_profile_list_debug,
remove_pkgmgr_from_nix_profile,
pkgmgr_help_debug,
)
def _make_temp_gitconfig_with_safe_dirs(home: Path) -> Path:
gitconfig = home / ".gitconfig"
gitconfig.write_text(
"[safe]\n"
"\tdirectory = /src\n"
"\tdirectory = /src/.git\n"
"\tdirectory = *\n"
)
return gitconfig
class TestIntegrationUpdateAllshallowNoSystem(unittest.TestCase):
def _common_env(self, home_dir: str) -> dict[str, str]:
env = os.environ.copy()
env["HOME"] = home_dir
home = Path(home_dir)
home.mkdir(parents=True, exist_ok=True)
env["GIT_CONFIG_GLOBAL"] = str(_make_temp_gitconfig_with_safe_dirs(home))
# Ensure nix is discoverable if the container has it
env["PATH"] = "/nix/var/nix/profiles/default/bin:" + env.get("PATH", "")
return env
def _run_cmd(self, cmd: list[str], label: str, env: dict[str, str]) -> None:
cmd_repr = " ".join(cmd)
print(f"\n[TEST] Running ({label}): {cmd_repr}")
proc = subprocess.run(
cmd,
check=False,
cwd=os.getcwd(),
env=env,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
print(proc.stdout.rstrip())
if proc.returncode != 0:
print(f"\n[TEST] Command failed ({label})")
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Exit code: {proc.returncode}")
nix_profile_list_debug(f"ON FAILURE ({label})")
raise AssertionError(
f"({label}) {cmd_repr!r} failed with exit code {proc.returncode}.\n\n"
f"--- output ---\n{proc.stdout}\n"
)
def _common_setup(self) -> None:
nix_profile_list_debug("BEFORE CLEANUP")
remove_pkgmgr_from_nix_profile()
nix_profile_list_debug("AFTER CLEANUP")
def test_update_all_repositories_shallow_pkgmgr_no_system(self) -> None:
self._common_setup()
with tempfile.TemporaryDirectory(prefix="pkgmgr-updateall-nosys-") as tmp:
env = self._common_env(tmp)
args = [
"update",
"--all",
"--clone-mode",
"shallow",
"--no-verification",
]
self._run_cmd(["pkgmgr", *args], label="pkgmgr", env=env)
pkgmgr_help_debug()
def test_update_all_repositories_shallow_nix_pkgmgr_no_system(self) -> None:
self._common_setup()
with tempfile.TemporaryDirectory(prefix="pkgmgr-updateall-nosys-nix-") as tmp:
env = self._common_env(tmp)
args = [
"update",
"--all",
"--clone-mode",
"shallow",
"--no-verification",
]
self._run_cmd(
["nix", "run", ".#pkgmgr", "--", *args],
label="nix run .#pkgmgr",
env=env,
)
pkgmgr_help_debug()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,124 @@
"""
Integration test: update ONLY the 'pkgmgr' repository with system updates enabled.
This test is intended to be run inside the Docker container where:
- network access is available,
- the config/config.yaml is present,
- and it is safe to perform real git operations.
It passes if BOTH commands complete successfully (in separate tests):
1) pkgmgr update pkgmgr --clone-mode shallow --no-verification --system
2) nix run .#pkgmgr -- update pkgmgr --clone-mode shallow --no-verification --system
"""
from __future__ import annotations
import os
import subprocess
import tempfile
import unittest
from pathlib import Path
from test_install_pkgmgr_shallow import (
nix_profile_list_debug,
remove_pkgmgr_from_nix_profile,
pkgmgr_help_debug,
)
def _make_temp_gitconfig_with_safe_dirs(home: Path) -> Path:
gitconfig = home / ".gitconfig"
gitconfig.write_text(
"[safe]\n"
"\tdirectory = /src\n"
"\tdirectory = /src/.git\n"
"\tdirectory = *\n"
)
return gitconfig
class TestIntegrationUpdatePkgmgrWithSystem(unittest.TestCase):
def _common_env(self, home_dir: str) -> dict[str, str]:
env = os.environ.copy()
env["HOME"] = home_dir
home = Path(home_dir)
home.mkdir(parents=True, exist_ok=True)
env["GIT_CONFIG_GLOBAL"] = str(_make_temp_gitconfig_with_safe_dirs(home))
# Ensure nix is discoverable if the container has it
env["PATH"] = "/nix/var/nix/profiles/default/bin:" + env.get("PATH", "")
return env
def _run_cmd(self, cmd: list[str], label: str, env: dict[str, str]) -> None:
cmd_repr = " ".join(cmd)
print(f"\n[TEST] Running ({label}): {cmd_repr}")
proc = subprocess.run(
cmd,
check=False,
cwd=os.getcwd(),
env=env,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
print(proc.stdout.rstrip())
if proc.returncode != 0:
print(f"\n[TEST] Command failed ({label})")
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Exit code: {proc.returncode}")
nix_profile_list_debug(f"ON FAILURE ({label})")
raise AssertionError(
f"({label}) {cmd_repr!r} failed with exit code {proc.returncode}.\n\n"
f"--- output ---\n{proc.stdout}\n"
)
def _common_setup(self) -> None:
nix_profile_list_debug("BEFORE CLEANUP")
remove_pkgmgr_from_nix_profile()
nix_profile_list_debug("AFTER CLEANUP")
def test_update_pkgmgr_shallow_pkgmgr_with_system(self) -> None:
self._common_setup()
with tempfile.TemporaryDirectory(prefix="pkgmgr-update-pkgmgr-sys-") as tmp:
env = self._common_env(tmp)
args = [
"update",
"pkgmgr",
"--clone-mode",
"shallow",
"--no-verification",
"--system",
]
self._run_cmd(["pkgmgr", *args], label="pkgmgr", env=env)
pkgmgr_help_debug()
def test_update_pkgmgr_shallow_nix_pkgmgr_with_system(self) -> None:
self._common_setup()
with tempfile.TemporaryDirectory(prefix="pkgmgr-update-pkgmgr-sys-nix-") as tmp:
env = self._common_env(tmp)
args = [
"update",
"pkgmgr",
"--clone-mode",
"shallow",
"--no-verification",
"--system",
]
self._run_cmd(
["nix", "run", ".#pkgmgr", "--", *args],
label="nix run .#pkgmgr",
env=env,
)
pkgmgr_help_debug()
if __name__ == "__main__":
unittest.main()

View File

@@ -5,15 +5,18 @@
Unit tests for NixFlakeInstaller using unittest (no pytest).
Covers:
- Successful installation (exit_code == 0)
- Successful installation (returncode == 0)
- Mandatory failure → SystemExit with correct code
- Optional failure (pkgmgr default) → no raise, but warning
- supports() behavior incl. PKGMGR_DISABLE_NIX_FLAKE_INSTALLER
"""
from __future__ import annotations
import io
import os
import shutil
import subprocess
import tempfile
import unittest
from contextlib import redirect_stdout
@@ -25,10 +28,19 @@ from pkgmgr.actions.install.installers.nix_flake import NixFlakeInstaller
class DummyCtx:
"""Minimal context object to satisfy NixFlakeInstaller.run() / supports()."""
def __init__(self, identifier: str, repo_dir: str, preview: bool = False):
def __init__(
self,
identifier: str,
repo_dir: str,
preview: bool = False,
quiet: bool = False,
force_update: bool = False,
):
self.identifier = identifier
self.repo_dir = repo_dir
self.preview = preview
self.quiet = quiet
self.force_update = force_update
class TestNixFlakeInstaller(unittest.TestCase):
@@ -44,161 +56,162 @@ class TestNixFlakeInstaller(unittest.TestCase):
os.environ.pop("PKGMGR_DISABLE_NIX_FLAKE_INSTALLER", None)
def tearDown(self) -> None:
# Cleanup temporary directory
if os.path.isdir(self._tmpdir):
shutil.rmtree(self._tmpdir, ignore_errors=True)
def _enable_nix_in_module(self, which_patch):
@staticmethod
def _cp(code: int) -> subprocess.CompletedProcess:
# stdout/stderr are irrelevant here, but keep shape realistic
return subprocess.CompletedProcess(args=["nix"], returncode=code, stdout="", stderr="")
@staticmethod
def _enable_nix_in_module(which_patch) -> None:
"""Ensure shutil.which('nix') in nix_flake module returns a path."""
which_patch.return_value = "/usr/bin/nix"
def test_nix_flake_run_success(self):
def test_nix_flake_run_success(self) -> None:
"""
When os.system returns a successful exit code, the installer
When run_command returns success (returncode 0), installer
should report success and not raise.
"""
ctx = DummyCtx(identifier="some-lib", repo_dir=self.repo_dir)
installer = NixFlakeInstaller()
buf = io.StringIO()
with patch(
"pkgmgr.actions.install.installers.nix_flake.shutil.which"
) as which_mock, patch(
"pkgmgr.actions.install.installers.nix_flake.os.system"
) as system_mock, redirect_stdout(buf):
with patch("pkgmgr.actions.install.installers.nix_flake.shutil.which") as which_mock, patch(
"pkgmgr.actions.install.installers.nix_flake.subprocess.run"
) as subproc_mock, patch(
"pkgmgr.actions.install.installers.nix_flake.run_command"
) as run_cmd_mock, redirect_stdout(buf):
self._enable_nix_in_module(which_mock)
# Simulate os.system returning success (exit code 0)
system_mock.return_value = 0
# For profile list JSON (used only on failure paths, but keep deterministic)
subproc_mock.return_value = subprocess.CompletedProcess(
args=["nix", "profile", "list", "--json"],
returncode=0,
stdout='{"elements": []}',
stderr="",
)
# Install succeeds
run_cmd_mock.return_value = self._cp(0)
# Sanity: supports() must be True
self.assertTrue(installer.supports(ctx))
installer.run(ctx)
out = buf.getvalue()
self.assertIn("[INFO] Running: nix profile install", out)
self.assertIn("Nix flake output 'default' successfully installed.", out)
self.assertIn("[nix] install: nix profile install", out)
self.assertIn("[nix] output 'default' successfully installed.", out)
# Ensure the nix command was actually invoked
system_mock.assert_called_with(
f"nix profile install {self.repo_dir}#default"
run_cmd_mock.assert_called_with(
f"nix profile install {self.repo_dir}#default",
cwd=self.repo_dir,
preview=False,
allow_failure=True,
)
def test_nix_flake_run_mandatory_failure_raises(self):
def test_nix_flake_run_mandatory_failure_raises(self) -> None:
"""
For a generic repository (identifier not pkgmgr/package-manager),
`default` is mandatory and a non-zero exit code should raise SystemExit
with the real exit code (e.g. 1, not 256).
For a generic repository, 'default' is mandatory.
A non-zero return code must raise SystemExit with that code.
"""
ctx = DummyCtx(identifier="some-lib", repo_dir=self.repo_dir)
installer = NixFlakeInstaller()
buf = io.StringIO()
with patch(
"pkgmgr.actions.install.installers.nix_flake.shutil.which"
) as which_mock, patch(
"pkgmgr.actions.install.installers.nix_flake.os.system"
) as system_mock, redirect_stdout(buf):
with patch("pkgmgr.actions.install.installers.nix_flake.shutil.which") as which_mock, patch(
"pkgmgr.actions.install.installers.nix_flake.subprocess.run"
) as subproc_mock, patch(
"pkgmgr.actions.install.installers.nix_flake.run_command"
) as run_cmd_mock, redirect_stdout(buf):
self._enable_nix_in_module(which_mock)
# Simulate os.system returning encoded status for exit code 1
# os.system encodes exit code as (exit_code << 8)
system_mock.return_value = 1 << 8
# No indices available (empty list)
subproc_mock.return_value = subprocess.CompletedProcess(
args=["nix", "profile", "list", "--json"],
returncode=0,
stdout='{"elements": []}',
stderr="",
)
# First install fails, retry fails -> should raise SystemExit(1)
run_cmd_mock.side_effect = [self._cp(1), self._cp(1)]
self.assertTrue(installer.supports(ctx))
with self.assertRaises(SystemExit) as cm:
installer.run(ctx)
# The real exit code should be 1 (not 256)
self.assertEqual(cm.exception.code, 1)
out = buf.getvalue()
self.assertIn("[INFO] Running: nix profile install", out)
self.assertIn("[Error] Failed to install Nix flake output 'default'", out)
self.assertIn("[Error] Command exited with code 1", out)
self.assertIn("[nix] install: nix profile install", out)
self.assertIn("[ERROR] Failed to install Nix flake output 'default' (exit 1)", out)
def test_nix_flake_run_optional_failure_does_not_raise(self):
def test_nix_flake_run_optional_failure_does_not_raise(self) -> None:
"""
For the package-manager repository, the 'default' output is optional.
Failure to install it must not raise, but should log a warning instead.
For pkgmgr/package-manager repositories:
- 'pkgmgr' output is mandatory
- 'default' output is optional
Failure of optional output must not raise.
"""
ctx = DummyCtx(identifier="pkgmgr", repo_dir=self.repo_dir)
installer = NixFlakeInstaller()
calls = []
def fake_system(cmd: str) -> int:
calls.append(cmd)
# First call (pkgmgr) → success
if len(calls) == 1:
return 0
# Second call (default) → failure (exit code 1 encoded)
return 1 << 8
buf = io.StringIO()
with patch(
"pkgmgr.actions.install.installers.nix_flake.shutil.which"
) as which_mock, patch(
"pkgmgr.actions.install.installers.nix_flake.os.system",
side_effect=fake_system,
), redirect_stdout(buf):
with patch("pkgmgr.actions.install.installers.nix_flake.shutil.which") as which_mock, patch(
"pkgmgr.actions.install.installers.nix_flake.subprocess.run"
) as subproc_mock, patch(
"pkgmgr.actions.install.installers.nix_flake.run_command"
) as run_cmd_mock, redirect_stdout(buf):
self._enable_nix_in_module(which_mock)
# No indices available (empty list)
subproc_mock.return_value = subprocess.CompletedProcess(
args=["nix", "profile", "list", "--json"],
returncode=0,
stdout='{"elements": []}',
stderr="",
)
# pkgmgr install ok; default fails twice (initial + retry)
run_cmd_mock.side_effect = [self._cp(0), self._cp(1), self._cp(1)]
self.assertTrue(installer.supports(ctx))
# Optional failure must NOT raise
# Must NOT raise despite optional failure
installer.run(ctx)
out = buf.getvalue()
# Both outputs should have been mentioned
self.assertIn(
"attempting to install profile outputs: pkgmgr, default", out
)
# Should announce both outputs
self.assertIn("ensuring outputs: pkgmgr, default", out)
# First output ("pkgmgr") succeeded
self.assertIn(
"Nix flake output 'pkgmgr' successfully installed.", out
)
# First output ok
self.assertIn("[nix] output 'pkgmgr' successfully installed.", out)
# Second output ("default") failed but did not raise
self.assertIn(
"[Error] Failed to install Nix flake output 'default'", out
)
self.assertIn("[Error] Command exited with code 1", out)
self.assertIn(
"Continuing despite failure to install optional output 'default'.",
out,
)
# Second output failed but no raise
self.assertIn("[ERROR] Failed to install Nix flake output 'default' (exit 1)", out)
self.assertIn("[WARNING] Continuing despite failure of optional output 'default'.", out)
# Ensure we actually called os.system twice (pkgmgr and default)
self.assertEqual(len(calls), 2)
self.assertIn(
f"nix profile install {self.repo_dir}#pkgmgr",
calls[0],
)
self.assertIn(
f"nix profile install {self.repo_dir}#default",
calls[1],
)
# Verify run_command was called for both outputs (default twice due to retry)
expected_calls = [
(f"nix profile install {self.repo_dir}#pkgmgr",),
(f"nix profile install {self.repo_dir}#default",),
(f"nix profile install {self.repo_dir}#default",),
]
actual_cmds = [c.args[0] for c in run_cmd_mock.call_args_list]
self.assertEqual(actual_cmds, [e[0] for e in expected_calls])
def test_nix_flake_supports_respects_disable_env(self):
def test_nix_flake_supports_respects_disable_env(self) -> None:
"""
PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 must disable the installer,
even if flake.nix exists and nix is available.
"""
ctx = DummyCtx(identifier="pkgmgr", repo_dir=self.repo_dir)
ctx = DummyCtx(identifier="pkgmgr", repo_dir=self.repo_dir, quiet=False)
installer = NixFlakeInstaller()
with patch(
"pkgmgr.actions.install.installers.nix_flake.shutil.which"
) as which_mock:
with patch("pkgmgr.actions.install.installers.nix_flake.shutil.which") as which_mock:
self._enable_nix_in_module(which_mock)
os.environ["PKGMGR_DISABLE_NIX_FLAKE_INSTALLER"] = "1"
self.assertFalse(installer.supports(ctx))