Compare commits

...

19 Commits

Author SHA1 Message Date
Kevin Veen-Birkenbach
3d7d7e9c09 Release version 1.6.4
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-14 19:33:07 +01:00
Kevin Veen-Birkenbach
328203ccd7 **test(nix): add comprehensive unittest coverage for nix installer helpers**
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
* Add reusable fakes for runner and retry logic
* Cover conflict resolution paths (store-prefix, output-token, textual fallback)
* Add unit tests for profile parsing, normalization, matching, and text parsing
* Verify installer core behavior for success, mandatory failure, and optional failure
* Keep tests Nix-free using pure unittest + mocks

https://chatgpt.com/share/693efe80-d928-800f-98b7-0aaafee1d32a
2025-12-14 19:27:26 +01:00
Kevin Veen-Birkenbach
ac16378807 Deleted deprecated unit tests:
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
https://chatgpt.com/share/693efe80-d928-800f-98b7-0aaafee1d32a
2025-12-14 19:14:42 +01:00
Kevin Veen-Birkenbach
f7a86bc353 fix(launcher): avoid calling missing retry helper in packaged installs
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Load GitHub 403 retry helper only when available
- Fallback to plain `nix run` if retry function is not defined
- Prevent exit 127 when pkgmgr launcher is installed without retry script
- Fix E2E failure for `pkgmgr update pkgmgr --system`

https://chatgpt.com/share/693efd23-8b60-800f-adbb-9dfffc33f1f7
2025-12-14 19:08:32 +01:00
Kevin Veen-Birkenbach
06a6a77a48 *fix(nix): resolve nix profile conflicts without numeric indices and fix update pkgmgr system test*
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
* Switch conflict handling from index-based removal to token-based removal (*nix profile remove <name>*) for newer nix versions
* Add robust parsing of *nix profile list --json* with normalization and heuristics for output/name matching
* Detect at runtime whether numeric profile indices are supported and fall back automatically when they are not
* Ensure *pkgmgr* / *package-manager* flake outputs are correctly identified and cleaned up during reinstall
* Fix failing E2E test *test_update_pkgmgr_shallow_pkgmgr_with_system* by reliably removing conflicting profile entries before reinstall

https://chatgpt.com/share/693efae5-b8bc-800f-94e3-28c93b74ed7b
2025-12-14 18:58:29 +01:00
Kevin Veen-Birkenbach
4883e40812 fix(ci): skip container publish when no version tag exists
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
* Remove unsupported `fetch-tags` input from checkout step
* Detect missing `v*` tag on workflow_run SHA and exit successfully
* Gate Buildx, GHCR login, and publish steps behind `should_publish` flag

https://chatgpt.com/share/693ee7f1-ed80-800f-bb03-369a1cc659e3
2025-12-14 17:38:06 +01:00
Kevin Veen-Birkenbach
031ae5ac69 test(integration): fix mirror tests by removing non-existent check_cmd patches
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Remove patches referencing pkgmgr.actions.mirror.check_cmd (module does not exist)
- Patch actual mirror probe/remote helpers used at runtime
- Make mirror integration tests deterministic and CI-safe

https://chatgpt.com/share/693ee657-b260-800f-a69a-8b0680e6baa5
2025-12-14 17:31:05 +01:00
Kevin Veen-Birkenbach
1c4fc531fa fix(shellcheck): correct source path hint for retry_403 helper
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Align ShellCheck source hint with repository layout
- Fix SC1091 without disabling checks
- Runtime sourcing via ${RETRY_LIB} remains unchanged

https://chatgpt.com/share/693ee308-6c48-800f-b14f-7d6081e14eb4
2025-12-14 17:16:35 +01:00
Kevin Veen-Birkenbach
33dfbf3a4d test(env-virtual): execute pkgmgr from Python venv instead of system launcher
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
The virtual environment test no longer invokes the distro-installed pkgmgr launcher (Nix-based).
Instead, it explicitly installs and activates the Python venv via make setup-venv and runs pkgmgr from there.

This aligns the test with its actual purpose (venv validation), avoids accidental execution of the Nix launcher, and fixes the failure caused by the missing run_with_github_403_retry helper in the venv workflow.

https://chatgpt.com/share/693ee224-e838-800f-8fa0-45295b2f5e20
2025-12-14 17:12:48 +01:00
Kevin Veen-Birkenbach
a3aa7b6394 git commit -am "fix(shellcheck): point source hint to repo-local retry_403.sh
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Fix SC1091 by updating ShellCheck source hint to repo path
- Keep runtime sourcing from /usr/lib/package-manager unchanged
- CI-safe without disabling ShellCheck rules"

https://chatgpt.com/share/693edae1-6d84-800f-8556-0e54dd15b944
2025-12-14 16:42:22 +01:00
Kevin Veen-Birkenbach
724c262a4a fix(test): import mirror submodules before patching in integration tests
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
Ensure pkgmgr.actions.mirror.* submodules are imported before unittest.mock.patch
to avoid AttributeError when patching dotted paths (e.g. check_cmd).
Stabilizes mirror CLI integration tests in CI.

https://chatgpt.com/share/693ed9f5-9918-800f-a880-d1238b3da1c9
2025-12-14 16:38:24 +01:00
Kevin Veen-Birkenbach
dcbe16c5f0 feat(launcher): enforce GitHub 403 retry for nix run
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Require retry_403.sh to exist and fail hard if missing
- Source retry helper unconditionally
- Run nix flake execution via run_with_github_403_retry
- Prevent transient GitHub API rate-limit failures during nix run

https://chatgpt.com/share/693ed83e-a2e8-800f-8c1b-d5d5afeaa6ad
2025-12-14 16:31:02 +01:00
Kevin Veen-Birkenbach
f63b0a9f08 chore(ci): rename codesniffer workflows to linter
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / linter-shell (push) Has been cancelled
Mark stable commit / linter-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Rename ShellCheck workflow to linter-shell
- Rename Ruff workflow to linter-python
- Update workflow calls and dependencies accordingly

https://chatgpt.com/share/693ed61a-7490-800f-aef1-fce845e717a2
2025-12-14 16:21:57 +01:00
Kevin Veen-Birkenbach
822c418503 Added missing import
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-14 16:16:37 +01:00
Kevin Veen-Birkenbach
562a6da291 test(integration): move mirror CLI tests from e2e to integration and patch side effects
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
https://chatgpt.com/share/693ed188-eb80-800f-8541-356e3fbd98c5
2025-12-14 16:14:17 +01:00
Kevin Veen-Birkenbach
e61b30d9af feat(tests): add unit tests for mirror context, io, commands, and remote helpers
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
https://chatgpt.com/share/693ed188-eb80-800f-8541-356e3fbd98c5
2025-12-14 16:02:11 +01:00
Kevin Veen-Birkenbach
27c0c7c01f **fix(mirror): derive remote repository owner and name from URL**
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
* Parse host, owner, and repository name directly from Git remote URLs
* Prevent provisioning under incorrect repository names
* Make Git URL the single source of truth for remote provisioning
* Improve diagnostics when URL parsing fails
2025-12-14 14:54:19 +01:00
Kevin Veen-Birkenbach
0d652d995e **feat(mirror,credentials): improve remote provisioning UX and token handling**
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
* Split mirror logic into atomic modules (remote check, provisioning, URL utils)
* Normalize Git remote URLs and provider host detection
* Add provider-specific token help URLs (GitHub, Gitea/Forgejo, GitLab)
* Improve keyring handling with clear warnings and install hints
* Gracefully fall back to prompt when keyring is unavailable
* Fix provider hint override logic during remote provisioning
2025-12-14 14:48:05 +01:00
Kevin Veen-Birkenbach
0e03fbbee2 Changed Mirror Name
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-14 14:01:19 +01:00
58 changed files with 2477 additions and 705 deletions

View File

@@ -28,8 +28,8 @@ jobs:
test-virgin-root:
uses: ./.github/workflows/test-virgin-root.yml
codesniffer-shellcheck:
uses: ./.github/workflows/codesniffer-shellcheck.yml
linter-shell:
uses: ./.github/workflows/linter-shell.yml
codesniffer-ruff:
uses: ./.github/workflows/codesniffer-ruff.yml
linter-python:
uses: ./.github/workflows/linter-python.yml

View File

@@ -4,7 +4,7 @@ on:
workflow_call:
jobs:
codesniffer-ruff:
linter-python:
runs-on: ubuntu-latest
steps:

View File

@@ -4,7 +4,7 @@ on:
workflow_call:
jobs:
codesniffer-shellcheck:
linter-shell:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

View File

@@ -29,16 +29,16 @@ jobs:
test-virgin-root:
uses: ./.github/workflows/test-virgin-root.yml
codesniffer-shellcheck:
uses: ./.github/workflows/codesniffer-shellcheck.yml
linter-shell:
uses: ./.github/workflows/linter-shell.yml
codesniffer-ruff:
uses: ./.github/workflows/codesniffer-ruff.yml
linter-python:
uses: ./.github/workflows/linter-python.yml
mark-stable:
needs:
- codesniffer-shellcheck
- codesniffer-ruff
- linter-shell
- linter-python
- test-unit
- test-integration
- test-env-nix

View File

@@ -19,7 +19,6 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 0
fetch-tags: true
- name: Checkout workflow_run commit and refresh tags
run: |
@@ -35,22 +34,30 @@ jobs:
SHA="$(git rev-parse HEAD)"
V_TAG="$(git tag --points-at "${SHA}" --list 'v*' | sort -V | tail -n1)"
[[ -n "$V_TAG" ]] || { echo "No version tag found"; exit 1; }
if [[ -z "${V_TAG}" ]]; then
echo "No version tag found for ${SHA}. Skipping publish."
echo "should_publish=false" >> "$GITHUB_OUTPUT"
exit 0
fi
VERSION="${V_TAG#v}"
STABLE_SHA="$(git rev-parse -q --verify refs/tags/stable^{commit} 2>/dev/null || true)"
IS_STABLE=false
[[ -n "${STABLE_SHA}" && "${STABLE_SHA}" == "${SHA}" ]] && IS_STABLE=true
echo "should_publish=true" >> "$GITHUB_OUTPUT"
echo "version=${VERSION}" >> "$GITHUB_OUTPUT"
echo "is_stable=${IS_STABLE}" >> "$GITHUB_OUTPUT"
- name: Set up Docker Buildx
if: ${{ steps.info.outputs.should_publish == 'true' }}
uses: docker/setup-buildx-action@v3
with:
use: true
- name: Login to GHCR
if: ${{ steps.info.outputs.should_publish == 'true' }}
uses: docker/login-action@v3
with:
registry: ghcr.io
@@ -58,6 +65,7 @@ jobs:
password: ${{ secrets.GITHUB_TOKEN }}
- name: Publish all images
if: ${{ steps.info.outputs.should_publish == 'true' }}
run: |
set -euo pipefail
OWNER="${{ github.repository_owner }}" \

View File

@@ -1,3 +1,11 @@
## [1.6.4] - 2025-12-14
* * Improved reliability of Nix installs and updates, including automatic resolution of profile conflicts and better handling of GitHub 403 rate limits.
* More stable launcher behavior in packaged and virtual-env setups.
* Enhanced mirror and remote handling: repository owner/name are derived from URLs, with smoother provisioning and clearer credential handling.
* More reliable releases and artifacts due to safer CI behavior when no version tag is present.
## [1.6.3] - 2025-12-14
* ***Fixed:*** Corrected repository path resolution so release and version logic consistently use the canonical packaging/* layout, preventing changelog and packaging files from being read or updated from incorrect locations.

View File

@@ -1,3 +1,3 @@
git@github.com:kevinveenbirkenbach/package-manager.git
ssh://git@git.veen.world:2201/kevinveenbirkenbach/pkgmgr.git
ssh://git@code.cymais.cloud:2201/kevinveenbirkenbach/pkgmgr.git
ssh://git@code.infinito.nexus:2201/kevinveenbirkenbach/pkgmgr.git

View File

@@ -32,7 +32,7 @@
rec {
pkgmgr = pyPkgs.buildPythonApplication {
pname = "package-manager";
version = "1.6.3";
version = "1.6.4";
# Use the git repo as source
src = ./.;

View File

@@ -1,7 +1,7 @@
# Maintainer: Kevin Veen-Birkenbach <info@veen.world>
pkgname=package-manager
pkgver=1.6.3
pkgver=1.6.4
pkgrel=1
pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)."
arch=('any')

View File

@@ -1,3 +1,12 @@
package-manager (1.6.4-1) unstable; urgency=medium
* * Improved reliability of Nix installs and updates, including automatic resolution of profile conflicts and better handling of GitHub 403 rate limits.
* More stable launcher behavior in packaged and virtual-env setups.
* Enhanced mirror and remote handling: repository owner/name are derived from URLs, with smoother provisioning and clearer credential handling.
* More reliable releases and artifacts due to safer CI behavior when no version tag is present.
-- Kevin Veen-Birkenbach <kevin@veen.world> Sun, 14 Dec 2025 19:33:07 +0100
package-manager (1.6.3-1) unstable; urgency=medium
* ***Fixed:*** Corrected repository path resolution so release and version logic consistently use the canonical packaging/* layout, preventing changelog and packaging files from being read or updated from incorrect locations.

View File

@@ -1,5 +1,5 @@
Name: package-manager
Version: 1.6.3
Version: 1.6.4
Release: 1%{?dist}
Summary: Wrapper that runs Kevin's package-manager via Nix flake
@@ -74,6 +74,12 @@ echo ">>> package-manager removed. Nix itself was not removed."
/usr/lib/package-manager/
%changelog
* Sun Dec 14 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.6.4-1
- * Improved reliability of Nix installs and updates, including automatic resolution of profile conflicts and better handling of GitHub 403 rate limits.
* More stable launcher behavior in packaged and virtual-env setups.
* Enhanced mirror and remote handling: repository owner/name are derived from URLs, with smoother provisioning and clearer credential handling.
* More reliable releases and artifacts due to safer CI behavior when no version tag is present.
* Sun Dec 14 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.6.3-1
- ***Fixed:*** Corrected repository path resolution so release and version logic consistently use the canonical packaging/* layout, preventing changelog and packaging files from being read or updated from incorrect locations.

View File

@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "package-manager"
version = "1.6.3"
version = "1.6.4"
description = "Kevin's package-manager tool (pkgmgr)"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -2,6 +2,16 @@
set -euo pipefail
FLAKE_DIR="/usr/lib/package-manager"
NIX_LIB_DIR="${FLAKE_DIR}/nix/lib"
RETRY_LIB="${NIX_LIB_DIR}/retry_403.sh"
# ---------------------------------------------------------------------------
# Hard requirement: retry helper must exist (fail if missing)
# ---------------------------------------------------------------------------
if [[ ! -f "${RETRY_LIB}" ]]; then
echo "[launcher] ERROR: Required retry helper not found: ${RETRY_LIB}" >&2
exit 1
fi
# ---------------------------------------------------------------------------
# Try to ensure that "nix" is on PATH (common locations + container user)
@@ -32,9 +42,13 @@ if ! command -v nix >/dev/null 2>&1; then
fi
# ---------------------------------------------------------------------------
# Primary path: use Nix flake if available
# Primary path: use Nix flake if available (with GitHub 403 retry)
# ---------------------------------------------------------------------------
if command -v nix >/dev/null 2>&1; then
if declare -F run_with_github_403_retry >/dev/null; then
# shellcheck source=./scripts/nix/lib/retry_403.sh
source "${RETRY_LIB}"
exec run_with_github_403_retry nix run "${FLAKE_DIR}#pkgmgr" -- "$@"
else
exec nix run "${FLAKE_DIR}#pkgmgr" -- "$@"
fi

View File

@@ -1,32 +1,49 @@
#!/usr/bin/env bash
set -euo pipefail
IMAGE="pkgmgr-$PKGMGR_DISTRO"
IMAGE="pkgmgr-${PKGMGR_DISTRO}"
echo
echo "------------------------------------------------------------"
echo ">>> Testing VENV: $IMAGE"
echo ">>> Testing VENV: ${IMAGE}"
echo "------------------------------------------------------------"
echo "[test-env-virtual] Inspect image metadata:"
docker image inspect "$IMAGE" | sed -n '1,40p'
echo "[test-env-virtual] Running: docker run --rm --entrypoint pkgmgr $IMAGE --help"
docker image inspect "${IMAGE}" | sed -n '1,40p'
echo
# Run the command and capture the output
# ------------------------------------------------------------
# Run VENV-based pkgmgr test inside container
# ------------------------------------------------------------
if OUTPUT=$(docker run --rm \
-e REINSTALL_PKGMGR=1 \
-v "pkgmgr_nix_store_${PKGMGR_DISTRO}:/nix" \
-v "$(pwd):/src" \
-v "pkgmgr_nix_cache_${PKGMGR_DISTRO}:/root/.cache/nix" \
"$IMAGE" 2>&1); then
-e REINSTALL_PKGMGR=1 \
-v "$(pwd):/src" \
-w /src \
"${IMAGE}" \
bash -lc '
set -euo pipefail
echo "[test-env-virtual] Installing pkgmgr (distro package)..."
make install
echo "[test-env-virtual] Setting up Python venv..."
make setup-venv
echo "[test-env-virtual] Activating venv..."
. "$HOME/.venvs/pkgmgr/bin/activate"
echo "[test-env-virtual] Using pkgmgr from:"
command -v pkgmgr
pkgmgr --help
' 2>&1); then
echo "$OUTPUT"
echo
echo "[test-env-virtual] SUCCESS: $IMAGE responded to 'pkgmgr --help'"
echo "[test-env-virtual] SUCCESS: venv-based pkgmgr works in ${IMAGE}"
else
echo "$OUTPUT"
echo
echo "[test-env-virtual] ERROR: $IMAGE failed to run 'pkgmgr --help'"
echo "[test-env-virtual] ERROR: venv-based pkgmgr failed in ${IMAGE}"
exit 1
fi
fi

View File

@@ -0,0 +1,100 @@
from __future__ import annotations
from typing import TYPE_CHECKING, List
from .profile import NixProfileInspector
from .retry import GitHubRateLimitRetry
from .runner import CommandRunner
from .textparse import NixConflictTextParser
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
class NixConflictResolver:
"""
Resolves nix profile file conflicts by:
1. Parsing conflicting store paths from stderr
2. Mapping them to profile remove tokens via `nix profile list --json`
3. Removing those tokens deterministically
4. Retrying install
"""
def __init__(
self,
runner: CommandRunner,
retry: GitHubRateLimitRetry,
profile: NixProfileInspector,
) -> None:
self._runner = runner
self._retry = retry
self._profile = profile
self._parser = NixConflictTextParser()
def resolve(
self,
ctx: "RepoContext",
install_cmd: str,
stdout: str,
stderr: str,
*,
output: str,
max_rounds: int = 10,
) -> bool:
quiet = bool(getattr(ctx, "quiet", False))
combined = f"{stdout}\n{stderr}"
for _ in range(max_rounds):
# 1) Extract conflicting store prefixes from nix error output
store_prefixes = self._parser.existing_store_prefixes(combined)
# 2) Resolve them to concrete remove tokens
tokens: List[str] = self._profile.find_remove_tokens_for_store_prefixes(
ctx,
self._runner,
store_prefixes,
)
# 3) Fallback: output-name based lookup (also covers nix suggesting: `nix profile remove pkgmgr`)
if not tokens:
tokens = self._profile.find_remove_tokens_for_output(ctx, self._runner, output)
if tokens:
if not quiet:
print(
"[nix] conflict detected; removing existing profile entries: "
+ ", ".join(tokens)
)
for t in tokens:
# tokens may contain things like "pkgmgr" or "pkgmgr-1" or quoted tokens (we keep raw)
self._runner.run(ctx, f"nix profile remove {t}", allow_failure=True)
res = self._retry.run_with_retry(ctx, self._runner, install_cmd)
if res.returncode == 0:
return True
combined = f"{res.stdout}\n{res.stderr}"
continue
# 4) Last-resort fallback: use textual remove tokens from stderr (“nix profile remove X”)
tokens = self._parser.remove_tokens(combined)
if tokens:
if not quiet:
print("[nix] fallback remove tokens: " + ", ".join(tokens))
for t in tokens:
self._runner.run(ctx, f"nix profile remove {t}", allow_failure=True)
res = self._retry.run_with_retry(ctx, self._runner, install_cmd)
if res.returncode == 0:
return True
combined = f"{res.stdout}\n{res.stderr}"
continue
if not quiet:
print("[nix] conflict detected but could not resolve profile entries to remove.")
return False
return False

View File

@@ -1,12 +1,12 @@
# src/pkgmgr/actions/install/installers/nix/installer.py
from __future__ import annotations
import os
import shutil
from typing import List, Tuple, TYPE_CHECKING
from typing import TYPE_CHECKING, List, Tuple
from pkgmgr.actions.install.installers.base import BaseInstaller
from .conflicts import NixConflictResolver
from .profile import NixProfileInspector
from .retry import GitHubRateLimitRetry, RetryPolicy
from .runner import CommandRunner
@@ -14,6 +14,7 @@ from .runner import CommandRunner
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
class NixFlakeInstaller(BaseInstaller):
layer = "nix"
FLAKE_FILE = "flake.nix"
@@ -22,15 +23,18 @@ class NixFlakeInstaller(BaseInstaller):
self._runner = CommandRunner()
self._retry = GitHubRateLimitRetry(policy=policy)
self._profile = NixProfileInspector()
self._conflicts = NixConflictResolver(self._runner, self._retry, self._profile)
# ------------------------------------------------------------------ #
# Compatibility: supports()
# ------------------------------------------------------------------ #
# Newer nix rejects numeric indices; we learn this at runtime and cache the decision.
self._indices_supported: bool | None = None
def supports(self, ctx: "RepoContext") -> bool:
if os.environ.get("PKGMGR_DISABLE_NIX_FLAKE_INSTALLER") == "1":
if not ctx.quiet:
print("[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 skipping NixFlakeInstaller.")
print(
"[INFO] PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 "
"skipping NixFlakeInstaller."
)
return False
if shutil.which("nix") is None:
@@ -38,20 +42,12 @@ class NixFlakeInstaller(BaseInstaller):
return os.path.exists(os.path.join(ctx.repo_dir, self.FLAKE_FILE))
# ------------------------------------------------------------------ #
# Compatibility: output selection
# ------------------------------------------------------------------ #
def _profile_outputs(self, ctx: "RepoContext") -> List[Tuple[str, bool]]:
# (output_name, allow_failure)
if ctx.identifier in {"pkgmgr", "package-manager"}:
return [("pkgmgr", False), ("default", True)]
return [("default", False)]
# ------------------------------------------------------------------ #
# Compatibility: run()
# ------------------------------------------------------------------ #
def run(self, ctx: "RepoContext") -> None:
if not self.supports(ctx):
return
@@ -59,11 +55,12 @@ class NixFlakeInstaller(BaseInstaller):
outputs = self._profile_outputs(ctx)
if not ctx.quiet:
print(
msg = (
"[nix] flake detected in "
f"{ctx.identifier}, ensuring outputs: "
+ ", ".join(name for name, _ in outputs)
)
print(msg)
for output, allow_failure in outputs:
if ctx.force_update:
@@ -71,13 +68,13 @@ class NixFlakeInstaller(BaseInstaller):
else:
self._install_only(ctx, output, allow_failure)
# ------------------------------------------------------------------ #
# Core logic (unchanged semantics)
# ------------------------------------------------------------------ #
def _installable(self, ctx: "RepoContext", output: str) -> str:
return f"{ctx.repo_dir}#{output}"
# ---------------------------------------------------------------------
# Core install path
# ---------------------------------------------------------------------
def _install_only(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
install_cmd = f"nix profile install {self._installable(ctx, output)}"
@@ -85,35 +82,56 @@ class NixFlakeInstaller(BaseInstaller):
print(f"[nix] install: {install_cmd}")
res = self._retry.run_with_retry(ctx, self._runner, install_cmd)
if res.returncode == 0:
if not ctx.quiet:
print(f"[nix] output '{output}' successfully installed.")
return
# Conflict resolver first (handles the common “existing package already provides file” case)
if self._conflicts.resolve(
ctx,
install_cmd,
res.stdout,
res.stderr,
output=output,
):
if not ctx.quiet:
print(f"[nix] output '{output}' successfully installed after conflict cleanup.")
return
if not ctx.quiet:
print(
f"[nix] install failed for '{output}' (exit {res.returncode}), "
"trying index-based upgrade/remove+install..."
"trying upgrade/remove+install..."
)
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
# If indices are supported, try legacy index-upgrade path.
if self._indices_supported is not False:
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
upgraded = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
upgraded = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
if upgraded:
return
if upgraded:
return
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
for idx in indices:
self._remove_index(ctx, idx)
for idx in indices:
self._remove_index(ctx, idx)
# If we learned indices are unsupported, immediately fall back below
if self._indices_supported is False:
self._remove_tokens_for_output(ctx, output)
else:
# indices explicitly unsupported
self._remove_tokens_for_output(ctx, output)
final = self._runner.run(ctx, install_cmd, allow_failure=True)
if final.returncode == 0:
@@ -122,17 +140,24 @@ class NixFlakeInstaller(BaseInstaller):
return
print(f"[ERROR] Failed to install Nix flake output '{output}' (exit {final.returncode})")
if not allow_failure:
raise SystemExit(final.returncode)
print(f"[WARNING] Continuing despite failure of optional output '{output}'.")
# ------------------------------------------------------------------ #
# force_update path (unchanged semantics)
# ------------------------------------------------------------------ #
# ---------------------------------------------------------------------
# force_update path
# ---------------------------------------------------------------------
def _force_upgrade_output(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
# Prefer token path if indices unsupported (new nix)
if self._indices_supported is False:
self._remove_tokens_for_output(ctx, output)
self._install_only(ctx, output, allow_failure)
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded.")
return
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
upgraded_any = False
@@ -143,7 +168,8 @@ class NixFlakeInstaller(BaseInstaller):
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
if upgraded_any:
print(f"[nix] output '{output}' successfully upgraded.")
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded.")
return
if indices and not ctx.quiet:
@@ -152,17 +178,52 @@ class NixFlakeInstaller(BaseInstaller):
for idx in indices:
self._remove_index(ctx, idx)
# If we learned indices are unsupported, also remove by token to actually clear conflicts
if self._indices_supported is False:
self._remove_tokens_for_output(ctx, output)
self._install_only(ctx, output, allow_failure)
print(f"[nix] output '{output}' successfully upgraded.")
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded.")
# ------------------------------------------------------------------ #
# ---------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------ #
# ---------------------------------------------------------------------
def _stderr_says_indices_unsupported(self, stderr: str) -> bool:
s = (stderr or "").lower()
return "no longer supports indices" in s or "does not support indices" in s
def _upgrade_index(self, ctx: "RepoContext", idx: int) -> bool:
res = self._runner.run(ctx, f"nix profile upgrade --refresh {idx}", allow_failure=True)
cmd = f"nix profile upgrade --refresh {idx}"
res = self._runner.run(ctx, cmd, allow_failure=True)
if self._stderr_says_indices_unsupported(getattr(res, "stderr", "")):
self._indices_supported = False
return False
if self._indices_supported is None:
self._indices_supported = True
return res.returncode == 0
def _remove_index(self, ctx: "RepoContext", idx: int) -> None:
self._runner.run(ctx, f"nix profile remove {idx}", allow_failure=True)
res = self._runner.run(ctx, f"nix profile remove {idx}", allow_failure=True)
if self._stderr_says_indices_unsupported(getattr(res, "stderr", "")):
self._indices_supported = False
if self._indices_supported is None:
self._indices_supported = True
def _remove_tokens_for_output(self, ctx: "RepoContext", output: str) -> None:
tokens = self._profile.find_remove_tokens_for_output(ctx, self._runner, output)
if not tokens:
return
if not ctx.quiet:
print(f"[nix] indices unsupported; removing by token(s): {', '.join(tokens)}")
for t in tokens:
self._runner.run(ctx, f"nix profile remove {t}", allow_failure=True)

View File

@@ -1,71 +0,0 @@
from __future__ import annotations
import json
from typing import Any, List, TYPE_CHECKING
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
from .runner import CommandRunner
class NixProfileInspector:
"""
Reads and interprets `nix profile list --json` and provides helpers for
finding indices matching a given output name.
"""
def find_installed_indices_for_output(self, ctx: "RepoContext", runner: "CommandRunner", output: str) -> List[int]:
res = runner.run(ctx, "nix profile list --json", allow_failure=True)
if res.returncode != 0:
return []
try:
data = json.loads(res.stdout or "{}")
except json.JSONDecodeError:
return []
indices: List[int] = []
elements = data.get("elements")
if isinstance(elements, dict):
for idx_str, elem in elements.items():
try:
idx = int(idx_str)
except (TypeError, ValueError):
continue
if self._element_matches_output(elem, output):
indices.append(idx)
return sorted(indices)
if isinstance(elements, list):
for elem in elements:
idx = elem.get("index") if isinstance(elem, dict) else None
if isinstance(idx, int) and self._element_matches_output(elem, output):
indices.append(idx)
return sorted(indices)
return []
@staticmethod
def element_matches_output(elem: Any, output: str) -> bool:
return NixProfileInspector._element_matches_output(elem, output)
@staticmethod
def _element_matches_output(elem: Any, output: str) -> bool:
out = (output or "").strip()
if not out or not isinstance(elem, dict):
return False
candidates: List[str] = []
for k in ("attrPath", "originalUrl", "url", "storePath", "name"):
v = elem.get(k)
if isinstance(v, str) and v:
candidates.append(v)
for c in candidates:
if c == out:
return True
if f"#{out}" in c:
return True
return False

View File

@@ -0,0 +1,4 @@
from .inspector import NixProfileInspector
from .models import NixProfileEntry
__all__ = ["NixProfileInspector", "NixProfileEntry"]

View File

@@ -0,0 +1,162 @@
from __future__ import annotations
from typing import Any, List, TYPE_CHECKING
from .matcher import (
entry_matches_output,
entry_matches_store_path,
stable_unique_ints,
)
from .normalizer import normalize_elements
from .parser import parse_profile_list_json
from .result import extract_stdout_text
if TYPE_CHECKING:
# Keep these as TYPE_CHECKING-only to avoid runtime import cycles.
from pkgmgr.actions.install.context import RepoContext
from pkgmgr.core.command.runner import CommandRunner
class NixProfileInspector:
"""
Reads and inspects the user's Nix profile list (JSON).
Public API:
- list_json()
- find_installed_indices_for_output() (legacy; may not work on newer nix)
- find_indices_by_store_path() (legacy; may not work on newer nix)
- find_remove_tokens_for_output()
- find_remove_tokens_for_store_prefixes()
"""
def list_json(self, ctx: "RepoContext", runner: "CommandRunner") -> dict[str, Any]:
res = runner.run(ctx, "nix profile list --json", allow_failure=False)
raw = extract_stdout_text(res)
return parse_profile_list_json(raw)
# ---------------------------------------------------------------------
# Legacy index helpers (still useful on older nix; newer nix may reject indices)
# ---------------------------------------------------------------------
def find_installed_indices_for_output(
self,
ctx: "RepoContext",
runner: "CommandRunner",
output: str,
) -> List[int]:
data = self.list_json(ctx, runner)
entries = normalize_elements(data)
hits: List[int] = []
for e in entries:
if e.index is None:
continue
if entry_matches_output(e, output):
hits.append(e.index)
return stable_unique_ints(hits)
def find_indices_by_store_path(
self,
ctx: "RepoContext",
runner: "CommandRunner",
store_path: str,
) -> List[int]:
needle = (store_path or "").strip()
if not needle:
return []
data = self.list_json(ctx, runner)
entries = normalize_elements(data)
hits: List[int] = []
for e in entries:
if e.index is None:
continue
if entry_matches_store_path(e, needle):
hits.append(e.index)
return stable_unique_ints(hits)
# ---------------------------------------------------------------------
# New token-based helpers (works with newer nix where indices are rejected)
# ---------------------------------------------------------------------
def find_remove_tokens_for_output(
self,
ctx: "RepoContext",
runner: "CommandRunner",
output: str,
) -> List[str]:
"""
Returns profile remove tokens to remove entries matching a given output.
We always include the raw output token first because nix itself suggests:
nix profile remove pkgmgr
"""
out = (output or "").strip()
if not out:
return []
data = self.list_json(ctx, runner)
entries = normalize_elements(data)
tokens: List[str] = [out] # critical: matches nix's own suggestion for conflicts
for e in entries:
if entry_matches_output(e, out):
# Prefer removing by key/name (non-index) when possible.
# New nix rejects numeric indices; these tokens are safer.
k = (e.key or "").strip()
n = (e.name or "").strip()
if k and not k.isdigit():
tokens.append(k)
elif n and not n.isdigit():
tokens.append(n)
# stable unique preserving order
seen: set[str] = set()
uniq: List[str] = []
for t in tokens:
if t and t not in seen:
uniq.append(t)
seen.add(t)
return uniq
def find_remove_tokens_for_store_prefixes(
self,
ctx: "RepoContext",
runner: "CommandRunner",
prefixes: List[str],
) -> List[str]:
"""
Returns remove tokens for entries whose store path matches any prefix.
"""
prefixes = [(p or "").strip() for p in (prefixes or []) if p]
prefixes = [p for p in prefixes if p]
if not prefixes:
return []
data = self.list_json(ctx, runner)
entries = normalize_elements(data)
tokens: List[str] = []
for e in entries:
if not e.store_paths:
continue
if any(sp == p for sp in e.store_paths for p in prefixes):
k = (e.key or "").strip()
n = (e.name or "").strip()
if k and not k.isdigit():
tokens.append(k)
elif n and not n.isdigit():
tokens.append(n)
seen: set[str] = set()
uniq: List[str] = []
for t in tokens:
if t and t not in seen:
uniq.append(t)
seen.add(t)
return uniq

View File

@@ -0,0 +1,62 @@
from __future__ import annotations
from typing import List
from .models import NixProfileEntry
def entry_matches_output(entry: NixProfileEntry, output: str) -> bool:
"""
Heuristic matcher: output is typically a flake output name (e.g. "pkgmgr"),
and we match against name/attrPath patterns.
"""
out = (output or "").strip()
if not out:
return False
candidates = [entry.name, entry.attr_path]
for c in candidates:
c = (c or "").strip()
if not c:
continue
# Direct match
if c == out:
return True
# AttrPath contains "#<output>"
if f"#{out}" in c:
return True
# AttrPath ends with ".<output>"
if c.endswith(f".{out}"):
return True
# Name pattern "<output>-<n>" (common, e.g. pkgmgr-1)
if c.startswith(f"{out}-"):
return True
# Historical special case: repo is "package-manager" but output is "pkgmgr"
if out == "pkgmgr" and c.startswith("package-manager-"):
return True
return False
def entry_matches_store_path(entry: NixProfileEntry, store_path: str) -> bool:
needle = (store_path or "").strip()
if not needle:
return False
return any((p or "") == needle for p in entry.store_paths)
def stable_unique_ints(values: List[int]) -> List[int]:
seen: set[int] = set()
uniq: List[int] = []
for v in values:
if v in seen:
continue
uniq.append(v)
seen.add(v)
return uniq

View File

@@ -0,0 +1,17 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional
@dataclass(frozen=True)
class NixProfileEntry:
"""
Minimal normalized representation of one nix profile element entry.
"""
key: str
index: Optional[int]
name: str
attr_path: str
store_paths: List[str]

View File

@@ -0,0 +1,128 @@
from __future__ import annotations
import re
from typing import Any, Dict, Iterable, List, Optional
from .models import NixProfileEntry
def coerce_index(key: str, entry: Dict[str, Any]) -> Optional[int]:
"""
Nix JSON schema varies:
- elements keys might be "0", "1", ...
- or might be names like "pkgmgr-1"
Some versions include an explicit index field.
We try safe options in order.
"""
k = (key or "").strip()
# 1) Classic: numeric keys
if k.isdigit():
try:
return int(k)
except Exception:
return None
# 2) Explicit index fields (schema-dependent)
for field in ("index", "id", "position"):
v = entry.get(field)
if isinstance(v, int):
return v
if isinstance(v, str) and v.strip().isdigit():
try:
return int(v.strip())
except Exception:
pass
# 3) Last resort: extract trailing number from key if it looks like "<name>-<n>"
m = re.match(r"^.+-(\d+)$", k)
if m:
try:
return int(m.group(1))
except Exception:
return None
return None
def iter_store_paths(entry: Dict[str, Any]) -> Iterable[str]:
"""
Yield all possible store paths from a nix profile JSON entry.
Nix has had schema shifts. We support common variants:
- "storePaths": ["/nix/store/..", ...]
- "storePaths": "/nix/store/.." (rare)
- "storePath": "/nix/store/.." (some variants)
- nested "outputs" dict(s) with store paths (best-effort)
"""
if not isinstance(entry, dict):
return
sp = entry.get("storePaths")
if isinstance(sp, list):
for p in sp:
if isinstance(p, str):
yield p
elif isinstance(sp, str):
yield sp
sp2 = entry.get("storePath")
if isinstance(sp2, str):
yield sp2
outs = entry.get("outputs")
if isinstance(outs, dict):
for _, ov in outs.items():
if isinstance(ov, dict):
p = ov.get("storePath")
if isinstance(p, str):
yield p
def normalize_store_path(store_path: str) -> str:
"""
Normalize store path for matching.
Currently just strips whitespace; hook for future normalization if needed.
"""
return (store_path or "").strip()
def normalize_elements(data: Dict[str, Any]) -> List[NixProfileEntry]:
"""
Converts nix profile list JSON into a list of normalized entries.
JSON formats observed:
- {"elements": {"0": {...}, "1": {...}}}
- {"elements": {"pkgmgr-1": {...}, "pkgmgr-2": {...}}}
"""
elements = data.get("elements")
if not isinstance(elements, dict):
return []
normalized: List[NixProfileEntry] = []
for k, entry in elements.items():
if not isinstance(entry, dict):
continue
idx = coerce_index(str(k), entry)
name = str(entry.get("name", "") or "")
attr = str(entry.get("attrPath", "") or "")
store_paths: List[str] = []
for p in iter_store_paths(entry):
sp = normalize_store_path(p)
if sp:
store_paths.append(sp)
normalized.append(
NixProfileEntry(
key=str(k),
index=idx,
name=name,
attr_path=attr,
store_paths=store_paths,
)
)
return normalized

View File

@@ -0,0 +1,19 @@
from __future__ import annotations
import json
from typing import Any, Dict
def parse_profile_list_json(raw: str) -> Dict[str, Any]:
"""
Parse JSON output from `nix profile list --json`.
Raises SystemExit with a helpful excerpt on parse failure.
"""
try:
return json.loads(raw)
except json.JSONDecodeError as e:
excerpt = (raw or "")[:5000]
raise SystemExit(
f"[nix] Failed to parse `nix profile list --json`: {e}\n{excerpt}"
) from e

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from typing import Any
def extract_stdout_text(result: Any) -> str:
"""
Normalize different runner return types to a stdout string.
Supported patterns:
- result is str -> returned as-is
- result is bytes/bytearray -> decoded UTF-8 (replace errors)
- result has `.stdout` (str or bytes) -> used
- fallback: str(result)
"""
if isinstance(result, str):
return result
if isinstance(result, (bytes, bytearray)):
return bytes(result).decode("utf-8", errors="replace")
stdout = getattr(result, "stdout", None)
if isinstance(stdout, str):
return stdout
if isinstance(stdout, (bytes, bytearray)):
return bytes(stdout).decode("utf-8", errors="replace")
return str(result)

View File

@@ -0,0 +1,69 @@
from __future__ import annotations
import re
from typing import TYPE_CHECKING, List, Tuple
from .runner import CommandRunner
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
class NixProfileListReader:
def __init__(self, runner: CommandRunner) -> None:
self._runner = runner
@staticmethod
def _store_prefix(path: str) -> str:
raw = (path or "").strip()
m = re.match(r"^(/nix/store/[0-9a-z]{32}-[^/ \t]+)", raw)
return m.group(1) if m else raw
def entries(self, ctx: "RepoContext") -> List[Tuple[int, str]]:
res = self._runner.run(ctx, "nix profile list", allow_failure=True)
if res.returncode != 0:
return []
entries: List[Tuple[int, str]] = []
pat = re.compile(
r"^\s*(\d+)\s+.*?(/nix/store/[0-9a-z]{32}-[^/ \t]+)",
re.MULTILINE,
)
for m in pat.finditer(res.stdout or ""):
idx_s = m.group(1)
sp = m.group(2)
try:
idx = int(idx_s)
except Exception:
continue
entries.append((idx, self._store_prefix(sp)))
seen: set[int] = set()
uniq: List[Tuple[int, str]] = []
for idx, sp in entries:
if idx not in seen:
seen.add(idx)
uniq.append((idx, sp))
return uniq
def indices_matching_store_prefixes(self, ctx: "RepoContext", prefixes: List[str]) -> List[int]:
prefixes = [self._store_prefix(p) for p in prefixes if p]
prefixes = [p for p in prefixes if p]
if not prefixes:
return []
hits: List[int] = []
for idx, sp in self.entries(ctx):
if any(sp == p for p in prefixes):
hits.append(idx)
seen: set[int] = set()
uniq: List[int] = []
for i in hits:
if i not in seen:
seen.add(i)
uniq.append(i)
return uniq

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
import re
from typing import List
class NixConflictTextParser:
@staticmethod
def _store_prefix(path: str) -> str:
raw = (path or "").strip()
m = re.match(r"^(/nix/store/[0-9a-z]{32}-[^/ \t]+)", raw)
return m.group(1) if m else raw
def remove_tokens(self, text: str) -> List[str]:
pat = re.compile(
r"^\s*nix profile remove\s+([^\s'\"`]+|'[^']+'|\"[^\"]+\")\s*$",
re.MULTILINE,
)
tokens: List[str] = []
for m in pat.finditer(text or ""):
t = (m.group(1) or "").strip()
if (t.startswith("'") and t.endswith("'")) or (t.startswith('"') and t.endswith('"')):
t = t[1:-1]
if t:
tokens.append(t)
seen: set[str] = set()
uniq: List[str] = []
for t in tokens:
if t not in seen:
seen.add(t)
uniq.append(t)
return uniq
def existing_store_prefixes(self, text: str) -> List[str]:
lines = (text or "").splitlines()
prefixes: List[str] = []
in_existing = False
in_new = False
store_pat = re.compile(r"^\s*(/nix/store/[0-9a-z]{32}-[^ \t]+)")
for raw in lines:
line = raw.strip()
if "An existing package already provides the following file" in line:
in_existing = True
in_new = False
continue
if "This is the conflicting file from the new package" in line:
in_existing = False
in_new = True
continue
if in_existing:
m = store_pat.match(raw)
if m:
prefixes.append(m.group(1))
continue
_ = in_new
norm = [self._store_prefix(p) for p in prefixes if p]
seen: set[str] = set()
uniq: List[str] = []
for p in norm:
if p and p not in seen:
seen.add(p)
uniq.append(p)
return uniq

View File

@@ -0,0 +1,21 @@
# src/pkgmgr/actions/mirror/remote_check.py
from __future__ import annotations
from typing import Tuple
from pkgmgr.core.git import GitError, run_git
def probe_mirror(url: str, repo_dir: str) -> Tuple[bool, str]:
"""
Probe a remote mirror URL using `git ls-remote`.
Returns:
(True, "") on success,
(False, error_message) on failure.
"""
try:
run_git(["ls-remote", url], cwd=repo_dir)
return True, ""
except GitError as exc:
return False, str(exc)

View File

@@ -0,0 +1,70 @@
# src/pkgmgr/actions/mirror/remote_provision.py
from __future__ import annotations
from typing import List
from pkgmgr.core.remote_provisioning import ProviderHint, RepoSpec, ensure_remote_repo
from pkgmgr.core.remote_provisioning.ensure import EnsureOptions
from .context import build_context
from .git_remote import determine_primary_remote_url
from .types import Repository
from .url_utils import normalize_provider_host, parse_repo_from_git_url
def ensure_remote_repository(
repo: Repository,
repositories_base_dir: str,
all_repos: List[Repository],
preview: bool,
) -> None:
ctx = build_context(repo, repositories_base_dir, all_repos)
resolved_mirrors = ctx.resolved_mirrors
primary_url = determine_primary_remote_url(repo, resolved_mirrors)
if not primary_url:
print("[INFO] No remote URL could be derived; skipping remote provisioning.")
return
host_raw, owner_from_url, name_from_url = parse_repo_from_git_url(primary_url)
host = normalize_provider_host(host_raw)
if not host or not owner_from_url or not name_from_url:
print("[WARN] Could not derive host/owner/repository from URL; cannot ensure remote repo.")
print(f" url={primary_url!r}")
print(f" host={host!r}, owner={owner_from_url!r}, repository={name_from_url!r}")
return
print("------------------------------------------------------------")
print(f"[REMOTE ENSURE] {ctx.identifier}")
print(f"[REMOTE ENSURE] host: {host}")
print("------------------------------------------------------------")
spec = RepoSpec(
host=str(host),
owner=str(owner_from_url),
name=str(name_from_url),
private=bool(repo.get("private", True)),
description=str(repo.get("description", "")),
)
provider_kind = str(repo.get("provider", "")).strip().lower() or None
try:
result = ensure_remote_repo(
spec,
provider_hint=ProviderHint(kind=provider_kind),
options=EnsureOptions(
preview=preview,
interactive=True,
allow_prompt=True,
save_prompt_token_to_keyring=True,
),
)
print(f"[REMOTE ENSURE] {result.status.upper()}: {result.message}")
if result.url:
print(f"[REMOTE ENSURE] URL: {result.url}")
except Exception as exc: # noqa: BLE001
print(f"[ERROR] Remote provisioning failed: {exc}")
print()

View File

@@ -1,131 +1,20 @@
# src/pkgmgr/actions/mirror/setup_cmd.py
from __future__ import annotations
from typing import List, Tuple
from urllib.parse import urlparse
from pkgmgr.core.git import GitError, run_git
from pkgmgr.core.remote_provisioning import ProviderHint, RepoSpec, ensure_remote_repo
from pkgmgr.core.remote_provisioning.ensure import EnsureOptions
from typing import List
from .context import build_context
from .git_remote import determine_primary_remote_url, ensure_origin_remote
from .git_remote import ensure_origin_remote, determine_primary_remote_url
from .remote_check import probe_mirror
from .remote_provision import ensure_remote_repository
from .types import Repository
def _probe_mirror(url: str, repo_dir: str) -> Tuple[bool, str]:
"""
Probe a remote mirror URL using `git ls-remote`.
Returns:
(True, "") on success,
(False, error_message) on failure.
"""
try:
run_git(["ls-remote", url], cwd=repo_dir)
return True, ""
except GitError as exc:
return False, str(exc)
def _host_from_git_url(url: str) -> str:
url = (url or "").strip()
if not url:
return ""
if "://" in url:
parsed = urlparse(url)
netloc = (parsed.netloc or "").strip()
if "@" in netloc:
netloc = netloc.split("@", 1)[1]
# keep optional :port
return netloc
# scp-like: git@host:owner/repo.git
if "@" in url and ":" in url:
after_at = url.split("@", 1)[1]
host = after_at.split(":", 1)[0]
return host.strip()
return url.split("/", 1)[0].strip()
def _ensure_remote_repository(
repo: Repository,
repositories_base_dir: str,
all_repos: List[Repository],
preview: bool,
) -> None:
"""
Ensure that the remote repository exists using provider APIs.
This is ONLY called when ensure_remote=True.
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
resolved_mirrors = ctx.resolved_mirrors
primary_url = determine_primary_remote_url(repo, resolved_mirrors)
if not primary_url:
print("[INFO] No remote URL could be derived; skipping remote provisioning.")
return
# IMPORTANT:
# - repo["provider"] is typically a provider *kind* (e.g. "github" / "gitea"),
# NOT a hostname. We derive the actual host from the remote URL.
host = _host_from_git_url(primary_url)
owner = repo.get("account")
name = repo.get("repository")
if not host or not owner or not name:
print("[WARN] Missing host/account/repository; cannot ensure remote repo.")
print(f" host={host!r}, account={owner!r}, repository={name!r}")
return
print("------------------------------------------------------------")
print(f"[REMOTE ENSURE] {ctx.identifier}")
print(f"[REMOTE ENSURE] host: {host}")
print("------------------------------------------------------------")
spec = RepoSpec(
host=str(host),
owner=str(owner),
name=str(name),
private=bool(repo.get("private", True)),
description=str(repo.get("description", "")),
)
provider_kind = str(repo.get("provider", "")).strip().lower() or None
try:
result = ensure_remote_repo(
spec,
provider_hint=ProviderHint(kind=provider_kind),
options=EnsureOptions(
preview=preview,
interactive=True,
allow_prompt=True,
save_prompt_token_to_keyring=True,
),
)
print(f"[REMOTE ENSURE] {result.status.upper()}: {result.message}")
if result.url:
print(f"[REMOTE ENSURE] URL: {result.url}")
except Exception as exc: # noqa: BLE001
# Keep action layer resilient
print(f"[ERROR] Remote provisioning failed: {exc}")
print()
def _setup_local_mirrors_for_repo(
repo: Repository,
repositories_base_dir: str,
all_repos: List[Repository],
preview: bool,
) -> None:
"""
Local setup:
- Ensure 'origin' remote exists and is sane
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
print("------------------------------------------------------------")
@@ -144,17 +33,6 @@ def _setup_remote_mirrors_for_repo(
preview: bool,
ensure_remote: bool,
) -> None:
"""
Remote-side setup / validation.
Default behavior:
- Non-destructive checks using `git ls-remote`.
Optional behavior:
- If ensure_remote=True:
* Attempt to create missing repositories via provider API
* Uses TokenResolver (ENV -> keyring -> prompt)
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
resolved_mirrors = ctx.resolved_mirrors
@@ -164,7 +42,7 @@ def _setup_remote_mirrors_for_repo(
print("------------------------------------------------------------")
if ensure_remote:
_ensure_remote_repository(
ensure_remote_repository(
repo,
repositories_base_dir=repositories_base_dir,
all_repos=all_repos,
@@ -178,7 +56,7 @@ def _setup_remote_mirrors_for_repo(
print()
return
ok, error_message = _probe_mirror(primary_url, ctx.repo_dir)
ok, error_message = probe_mirror(primary_url, ctx.repo_dir)
if ok:
print(f"[OK] primary: {primary_url}")
else:
@@ -190,7 +68,7 @@ def _setup_remote_mirrors_for_repo(
return
for name, url in sorted(resolved_mirrors.items()):
ok, error_message = _probe_mirror(url, ctx.repo_dir)
ok, error_message = probe_mirror(url, ctx.repo_dir)
if ok:
print(f"[OK] {name}: {url}")
else:
@@ -210,19 +88,6 @@ def setup_mirrors(
remote: bool = True,
ensure_remote: bool = False,
) -> None:
"""
Setup mirrors for the selected repositories.
local:
- Configure local Git remotes (ensure 'origin' exists).
remote:
- Non-destructive remote checks using `git ls-remote`.
ensure_remote:
- If True, attempt to create missing remote repositories via provider APIs.
- This is explicit and NEVER enabled implicitly.
"""
for repo in selected_repos:
if local:
_setup_local_mirrors_for_repo(

View File

@@ -0,0 +1,111 @@
# src/pkgmgr/actions/mirror/url_utils.py
from __future__ import annotations
from urllib.parse import urlparse
from typing import Optional, Tuple
def hostport_from_git_url(url: str) -> Tuple[str, Optional[str]]:
url = (url or "").strip()
if not url:
return "", None
if "://" in url:
parsed = urlparse(url)
netloc = (parsed.netloc or "").strip()
if "@" in netloc:
netloc = netloc.split("@", 1)[1]
if netloc.startswith("[") and "]" in netloc:
host = netloc[1:netloc.index("]")]
rest = netloc[netloc.index("]") + 1 :]
port = rest[1:] if rest.startswith(":") else None
return host.strip(), (port.strip() if port else None)
if ":" in netloc:
host, port = netloc.rsplit(":", 1)
return host.strip(), (port.strip() or None)
return netloc.strip(), None
if "@" in url and ":" in url:
after_at = url.split("@", 1)[1]
host = after_at.split(":", 1)[0].strip()
return host, None
host = url.split("/", 1)[0].strip()
return host, None
def normalize_provider_host(host: str) -> str:
host = (host or "").strip()
if not host:
return ""
if host.startswith("[") and "]" in host:
host = host[1:host.index("]")]
if ":" in host and host.count(":") == 1:
host = host.rsplit(":", 1)[0]
return host.strip().lower()
def _strip_dot_git(name: str) -> str:
n = (name or "").strip()
if n.lower().endswith(".git"):
return n[:-4]
return n
def parse_repo_from_git_url(url: str) -> Tuple[str, Optional[str], Optional[str]]:
"""
Parse (host, owner, repo_name) from common Git remote URLs.
Supports:
- ssh://git@host:2201/owner/repo.git
- https://host/owner/repo.git
- git@host:owner/repo.git
- host/owner/repo(.git) (best-effort)
Returns:
(host, owner, repo_name) with owner/repo possibly None if not derivable.
"""
u = (url or "").strip()
if not u:
return "", None, None
# URL-style (ssh://, https://, http://)
if "://" in u:
parsed = urlparse(u)
host = (parsed.hostname or "").strip()
path = (parsed.path or "").strip("/")
parts = [p for p in path.split("/") if p]
if len(parts) >= 2:
owner = parts[0]
repo_name = _strip_dot_git(parts[1])
return host, owner, repo_name
return host, None, None
# SCP-like: git@host:owner/repo.git
if "@" in u and ":" in u:
after_at = u.split("@", 1)[1]
host = after_at.split(":", 1)[0].strip()
path = after_at.split(":", 1)[1].strip("/")
parts = [p for p in path.split("/") if p]
if len(parts) >= 2:
owner = parts[0]
repo_name = _strip_dot_git(parts[1])
return host, owner, repo_name
return host, None, None
# Fallback: host/owner/repo.git
host = u.split("/", 1)[0].strip()
rest = u.split("/", 1)[1] if "/" in u else ""
parts = [p for p in rest.strip("/").split("/") if p]
if len(parts) >= 2:
owner = parts[0]
repo_name = _strip_dot_git(parts[1])
return host, owner, repo_name
return host, None, None

View File

@@ -9,15 +9,33 @@ from ..types import KeyringUnavailableError, TokenRequest, TokenResult
def _import_keyring():
"""
Import python-keyring.
Raises:
KeyringUnavailableError if:
- library is missing
- no backend is configured / usable
- import fails for any reason
"""
try:
import keyring # type: ignore
return keyring
except Exception as exc: # noqa: BLE001
raise KeyringUnavailableError(
"python-keyring is not available or no backend is configured."
"python-keyring is not installed."
) from exc
# Some environments have keyring installed but no usable backend.
# We do a lightweight "backend sanity check" by attempting to read the backend.
try:
_ = keyring.get_keyring()
except Exception as exc: # noqa: BLE001
raise KeyringUnavailableError(
"python-keyring is installed but no usable keyring backend is configured."
) from exc
return keyring
@dataclass(frozen=True)
class KeyringTokenProvider:

View File

@@ -9,6 +9,37 @@ from typing import Optional
from ..types import TokenRequest, TokenResult
def _token_help_url(provider_kind: str, host: str) -> Optional[str]:
"""
Return a provider-specific URL where a user can create/get an API token.
Keep this conservative and stable:
- GitHub: official token settings URL
- Gitea/Forgejo: common settings path on the given host
- GitLab: common personal access token path
"""
kind = (provider_kind or "").strip().lower()
h = (host or "").strip()
# GitHub (cloud)
if kind == "github":
return "https://github.com/settings/tokens"
# Gitea / Forgejo (self-hosted)
if kind in ("gitea", "forgejo"):
# Typical UI path: Settings -> Applications -> Access Tokens
# In many installations this is available at /user/settings/applications
base = f"https://{h}".rstrip("/")
return f"{base}/user/settings/applications"
# GitLab (cloud or self-hosted)
if kind == "gitlab":
base = "https://gitlab.com" if not h else f"https://{h}".rstrip("/")
return f"{base}/-/profile/personal_access_tokens"
return None
@dataclass(frozen=True)
class PromptTokenProvider:
"""Interactively prompt for a token.
@@ -25,6 +56,11 @@ class PromptTokenProvider:
return None
owner_info = f" (owner: {request.owner})" if request.owner else ""
help_url = _token_help_url(request.provider_kind, request.host)
if help_url:
print(f"[INFO] Create/get your token here: {help_url}")
prompt = f"Enter API token for {request.provider_kind} on {request.host}{owner_info}: "
token = (getpass(prompt) or "").strip()
if not token:

View File

@@ -1,13 +1,14 @@
# src/pkgmgr/core/credentials/resolver.py
from __future__ import annotations
import sys
from dataclasses import dataclass
from typing import Optional
from .providers.env import EnvTokenProvider
from .providers.keyring import KeyringTokenProvider
from .providers.prompt import PromptTokenProvider
from .types import NoCredentialsError, TokenRequest, TokenResult
from .types import KeyringUnavailableError, NoCredentialsError, TokenRequest, TokenResult
@dataclass(frozen=True)
@@ -26,6 +27,26 @@ class TokenResolver:
self._env = EnvTokenProvider()
self._keyring = KeyringTokenProvider()
self._prompt = PromptTokenProvider()
self._warned_keyring: bool = False
def _warn_keyring_unavailable(self, exc: Exception) -> None:
if self._warned_keyring:
return
self._warned_keyring = True
msg = str(exc).strip() or "Keyring is unavailable."
print("[WARN] Keyring support is not available.", file=sys.stderr)
print(f" {msg}", file=sys.stderr)
print(" Tokens will NOT be persisted securely.", file=sys.stderr)
print("", file=sys.stderr)
print(" To enable secure token storage, install python-keyring:", file=sys.stderr)
print(" pip install keyring", file=sys.stderr)
print("", file=sys.stderr)
print(" Or install via system packages:", file=sys.stderr)
print(" sudo apt install python3-keyring", file=sys.stderr)
print(" sudo pacman -S python-keyring", file=sys.stderr)
print(" sudo dnf install python3-keyring", file=sys.stderr)
print("", file=sys.stderr)
def get_token(
self,
@@ -47,9 +68,11 @@ class TokenResolver:
kr_res = self._keyring.get(request)
if kr_res:
return kr_res
except KeyringUnavailableError as exc:
# Show a helpful warning once, then continue (prompt fallback).
self._warn_keyring_unavailable(exc)
except Exception:
# Keyring missing/unavailable: ignore to allow prompt (workstations)
# or to fail cleanly below (headless CI without prompt).
# Unknown keyring errors: do not block prompting; still avoid hard crash.
pass
# 3) Prompt (optional)
@@ -59,6 +82,8 @@ class TokenResolver:
if opts.save_prompt_token_to_keyring:
try:
self._keyring.set(request, prompt_res.token)
except KeyringUnavailableError as exc:
self._warn_keyring_unavailable(exc)
except Exception:
# If keyring cannot store, still use token for this run.
pass

View File

@@ -64,10 +64,12 @@ def ensure_remote_repo(
provider = reg.resolve(spec.host)
if provider_hint and provider_hint.kind:
forced = provider_hint.kind.strip().lower()
provider = next(
forced_provider = next(
(p for p in reg.providers if getattr(p, "kind", "").lower() == forced),
None,
)
if forced_provider is not None:
provider = forced_provider
if provider is None:
raise UnsupportedProviderError(f"No provider matched host: {spec.host}")

View File

@@ -1,164 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
E2E integration tests for the `pkgmgr mirror` command family.
Covered commands:
- pkgmgr mirror --help
- pkgmgr mirror list --preview --all
- pkgmgr mirror diff --preview --all
- pkgmgr mirror merge config file --preview --all
- pkgmgr mirror setup --preview --all
- pkgmgr mirror check --preview --all
- pkgmgr mirror provision --preview --all
All commands are executed via the real CLI entry point (main module).
With --preview enabled, all operations are non-destructive and safe
to run inside CI containers.
"""
import io
import runpy
import sys
import unittest
from contextlib import redirect_stdout, redirect_stderr
class TestIntegrationMirrorCommands(unittest.TestCase):
"""
End-to-end tests for `pkgmgr mirror` commands.
"""
# ------------------------------------------------------------
# Helper
# ------------------------------------------------------------
def _run_pkgmgr(self, args):
"""
Execute pkgmgr with the given arguments and return captured output.
- Treat SystemExit(0) or SystemExit(None) as success.
- Any other exit code is considered a test failure.
"""
original_argv = list(sys.argv)
buffer = io.StringIO()
cmd_repr = "pkgmgr " + " ".join(args)
try:
sys.argv = ["pkgmgr"] + list(args)
try:
with redirect_stdout(buffer), redirect_stderr(buffer):
runpy.run_module("pkgmgr", run_name="__main__")
except SystemExit as exc:
code = exc.code if isinstance(exc.code, int) else None
if code not in (0, None):
raise AssertionError(
"%r failed with exit code %r.\n\nOutput:\n%s"
% (cmd_repr, exc.code, buffer.getvalue())
)
return buffer.getvalue()
finally:
sys.argv = original_argv
# ------------------------------------------------------------
# Tests
# ------------------------------------------------------------
def test_mirror_help(self):
"""
`pkgmgr mirror --help` should run without error and print usage info.
"""
output = self._run_pkgmgr(["mirror", "--help"])
self.assertIn("usage:", output)
self.assertIn("pkgmgr mirror", output)
def test_mirror_list_preview_all(self):
"""
`pkgmgr mirror list --preview --all`
"""
output = self._run_pkgmgr(
["mirror", "list", "--preview", "--all"]
)
self.assertTrue(
output.strip(),
"Expected output from mirror list",
)
def test_mirror_diff_preview_all(self):
"""
`pkgmgr mirror diff --preview --all`
"""
output = self._run_pkgmgr(
["mirror", "diff", "--preview", "--all"]
)
self.assertTrue(
output.strip(),
"Expected output from mirror diff",
)
def test_mirror_merge_config_to_file_preview_all(self):
"""
`pkgmgr mirror merge config file --preview --all`
"""
output = self._run_pkgmgr(
[
"mirror",
"merge",
"config",
"file",
"--preview",
"--all",
]
)
self.assertTrue(
output.strip(),
"Expected output from mirror merge (config -> file)",
)
def test_mirror_setup_preview_all(self):
"""
`pkgmgr mirror setup --preview --all`
"""
output = self._run_pkgmgr(
["mirror", "setup", "--preview", "--all"]
)
self.assertTrue(
output.strip(),
"Expected output from mirror setup",
)
def test_mirror_check_preview_all(self):
"""
`pkgmgr mirror check --preview --all`
Performs non-destructive remote checks (git ls-remote).
"""
output = self._run_pkgmgr(
["mirror", "check", "--preview", "--all"]
)
self.assertTrue(
output.strip(),
"Expected output from mirror check",
)
def test_mirror_provision_preview_all(self):
"""
`pkgmgr mirror provision --preview --all`
In preview mode this MUST NOT create remote repositories.
"""
output = self._run_pkgmgr(
["mirror", "provision", "--preview", "--all"]
)
self.assertTrue(
output.strip(),
"Expected output from mirror provision (preview)",
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,172 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CLI integration tests for `pkgmgr mirror`.
These tests validate:
- CLI argument parsing
- command dispatch
- command orchestration
All side effects (git, network, remote provisioning, filesystem writes)
are patched to keep tests deterministic and CI-safe.
"""
from __future__ import annotations
import importlib
import io
import os
import runpy
import sys
import unittest
from contextlib import ExitStack, redirect_stderr, redirect_stdout
from typing import Dict, List, Optional
from unittest.mock import MagicMock, PropertyMock, patch
class TestIntegrationMirrorCommands(unittest.TestCase):
"""
Integration tests for `pkgmgr mirror` commands.
"""
def _run_pkgmgr(self, args: List[str], extra_env: Optional[Dict[str, str]] = None) -> str:
"""
Execute pkgmgr with the given arguments and return captured output.
- Treat SystemExit(0) or SystemExit(None) as success.
- Any other exit code is considered a test failure.
- Mirror commands are patched to avoid network/destructive operations.
"""
original_argv = list(sys.argv)
original_env = dict(os.environ)
buffer = io.StringIO()
cmd_repr = "pkgmgr " + " ".join(args)
# Shared dummy context used by multiple mirror commands
dummy_ctx = MagicMock()
dummy_ctx.identifier = "dummy-repo"
dummy_ctx.repo_dir = "/tmp/dummy-repo"
dummy_ctx.config_mirrors = {"origin": "git@github.com:alice/repo.git"}
dummy_ctx.file_mirrors = {"backup": "ssh://git@git.example:2201/alice/repo.git"}
type(dummy_ctx).resolved_mirrors = PropertyMock(
return_value={
"origin": "git@github.com:alice/repo.git",
"backup": "ssh://git@git.example:2201/alice/repo.git",
}
)
# Helper: patch with create=True so missing symbols don't explode.
# IMPORTANT: patch() does not auto-import submodules when resolving dotted names.
def _p(target: str, **kwargs):
module_name = target.rsplit(".", 1)[0]
try:
importlib.import_module(module_name)
except ModuleNotFoundError:
# If the module truly doesn't exist, create=True may still allow patching
# in some cases, but dotted resolution can still fail. Best-effort.
pass
return patch(target, create=True, **kwargs)
# Fake result for remote provisioning (preview-safe)
def _fake_ensure_remote_repo(spec, provider_hint=None, options=None):
# Safety: E2E should only ever call this in preview mode
if options is not None and getattr(options, "preview", False) is not True:
raise AssertionError(
f"{cmd_repr} attempted ensure_remote_repo without preview=True in E2E."
)
r = MagicMock()
r.status = "preview"
r.message = "Preview mode (E2E patched): no remote provisioning performed."
r.url = None
return r
try:
sys.argv = ["pkgmgr"] + list(args)
if extra_env:
os.environ.update(extra_env)
with ExitStack() as stack:
# build_context is imported directly in these modules:
stack.enter_context(_p("pkgmgr.actions.mirror.list_cmd.build_context", return_value=dummy_ctx))
stack.enter_context(_p("pkgmgr.actions.mirror.diff_cmd.build_context", return_value=dummy_ctx))
stack.enter_context(_p("pkgmgr.actions.mirror.merge_cmd.build_context", return_value=dummy_ctx))
stack.enter_context(_p("pkgmgr.actions.mirror.setup_cmd.build_context", return_value=dummy_ctx))
stack.enter_context(_p("pkgmgr.actions.mirror.remote_provision.build_context", return_value=dummy_ctx))
# Deterministic remote probing (covers setup + likely check implementations)
stack.enter_context(_p("pkgmgr.actions.mirror.remote_check.probe_mirror", return_value=(True, "")))
stack.enter_context(_p("pkgmgr.actions.mirror.setup_cmd.probe_mirror", return_value=(True, "")))
stack.enter_context(_p("pkgmgr.actions.mirror.git_remote.is_remote_reachable", return_value=True))
# setup_cmd imports ensure_origin_remote directly:
stack.enter_context(_p("pkgmgr.actions.mirror.setup_cmd.ensure_origin_remote", return_value=None))
# Extra safety: if any code calls git_remote.ensure_origin_remote directly
stack.enter_context(_p("pkgmgr.actions.mirror.git_remote.ensure_origin_remote", return_value=None))
# remote provisioning: remote_provision imports ensure_remote_repo directly from core:
stack.enter_context(
_p(
"pkgmgr.actions.mirror.remote_provision.ensure_remote_repo",
side_effect=_fake_ensure_remote_repo,
)
)
# Extra safety: if anything calls remote_check.run_git directly, make it inert
stack.enter_context(_p("pkgmgr.actions.mirror.remote_check.run_git", return_value="dummy"))
with redirect_stdout(buffer), redirect_stderr(buffer):
try:
runpy.run_module("pkgmgr", run_name="__main__")
except SystemExit as exc:
code = exc.code if isinstance(exc.code, int) else None
if code not in (0, None):
raise AssertionError(
"%r failed with exit code %r.\n\nOutput:\n%s"
% (cmd_repr, exc.code, buffer.getvalue())
)
return buffer.getvalue()
finally:
sys.argv = original_argv
os.environ.clear()
os.environ.update(original_env)
# ------------------------------------------------------------
# Tests
# ------------------------------------------------------------
def test_mirror_help(self) -> None:
output = self._run_pkgmgr(["mirror", "--help"])
self.assertIn("usage:", output.lower())
self.assertIn("mirror", output.lower())
def test_mirror_list_preview_all(self) -> None:
output = self._run_pkgmgr(["mirror", "list", "--preview", "--all"])
self.assertTrue(output.strip(), "Expected output from mirror list")
def test_mirror_diff_preview_all(self) -> None:
output = self._run_pkgmgr(["mirror", "diff", "--preview", "--all"])
self.assertTrue(output.strip(), "Expected output from mirror diff")
def test_mirror_merge_config_to_file_preview_all(self) -> None:
output = self._run_pkgmgr(["mirror", "merge", "config", "file", "--preview", "--all"])
self.assertTrue(output.strip(), "Expected output from mirror merge (config -> file)")
def test_mirror_setup_preview_all(self) -> None:
output = self._run_pkgmgr(["mirror", "setup", "--preview", "--all"])
self.assertTrue(output.strip(), "Expected output from mirror setup")
def test_mirror_check_preview_all(self) -> None:
output = self._run_pkgmgr(["mirror", "check", "--preview", "--all"])
self.assertTrue(output.strip(), "Expected output from mirror check")
def test_mirror_provision_preview_all(self) -> None:
output = self._run_pkgmgr(["mirror", "provision", "--preview", "--all"])
self.assertTrue(output.strip(), "Expected output from mirror provision (preview)")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
import json
import unittest
from dataclasses import dataclass
@dataclass
class FakeRunResult:
"""
Mimics your runner returning a structured result object.
"""
returncode: int
stdout: str
stderr: str = ""
class FakeRunner:
"""
Minimal runner stub: returns exactly what we configure.
"""
def __init__(self, result):
self._result = result
def run(self, ctx, cmd: str, allow_failure: bool = False):
return self._result
class TestE2ENixProfileListJsonParsing(unittest.TestCase):
"""
This test verifies that NixProfileInspector can parse `nix profile list --json`
regardless of whether the CommandRunner returns:
- raw stdout string, OR
- a RunResult-like object with a `.stdout` attribute.
"""
def test_list_json_accepts_raw_string(self) -> None:
from pkgmgr.actions.install.installers.nix.profile import NixProfileInspector
payload = {"elements": {"pkgmgr-1": {"attrPath": "packages.x86_64-linux.pkgmgr"}}}
raw = json.dumps(payload)
runner = FakeRunner(raw)
inspector = NixProfileInspector()
data = inspector.list_json(ctx=None, runner=runner)
self.assertEqual(data["elements"]["pkgmgr-1"]["attrPath"], "packages.x86_64-linux.pkgmgr")
def test_list_json_accepts_runresult_object(self) -> None:
from pkgmgr.actions.install.installers.nix.profile import NixProfileInspector
payload = {"elements": {"pkgmgr-1": {"attrPath": "packages.x86_64-linux.pkgmgr"}}}
raw = json.dumps(payload)
runner = FakeRunner(FakeRunResult(returncode=0, stdout=raw))
inspector = NixProfileInspector()
data = inspector.list_json(ctx=None, runner=runner)
self.assertEqual(data["elements"]["pkgmgr-1"]["attrPath"], "packages.x86_64-linux.pkgmgr")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,44 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class FakeRunResult:
returncode: int
stdout: str = ""
stderr: str = ""
class FakeRunner:
"""
Minimal runner stub compatible with:
- CommandRunner.run(ctx, cmd, allow_failure=...)
- Generic runner.run(ctx, cmd, allow_failure=...)
"""
def __init__(self, mapping: Optional[dict[str, Any]] = None, default: Any = None):
self.mapping = mapping or {}
self.default = default if default is not None else FakeRunResult(0, "", "")
self.calls: list[tuple[Any, str, bool]] = []
def run(self, ctx, cmd: str, allow_failure: bool = False):
self.calls.append((ctx, cmd, allow_failure))
return self.mapping.get(cmd, self.default)
class FakeRetry:
"""
Mimics GitHubRateLimitRetry.run_with_retry(ctx, runner, cmd)
"""
def __init__(self, results: list[FakeRunResult]):
self._results = list(results)
self.calls: list[str] = []
def run_with_retry(self, ctx, runner, cmd: str):
self.calls.append(cmd)
if self._results:
return self._results.pop(0)
return FakeRunResult(0, "", "")

View File

@@ -0,0 +1,58 @@
from __future__ import annotations
import unittest
from pkgmgr.actions.install.installers.nix.conflicts import NixConflictResolver
from ._fakes import FakeRunResult, FakeRunner, FakeRetry
class DummyCtx:
quiet = True
class TestNixConflictResolver(unittest.TestCase):
def test_resolve_removes_tokens_and_retries_success(self) -> None:
ctx = DummyCtx()
install_cmd = "nix profile install /repo#default"
stderr = '''
error: An existing package already provides the following file:
/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-pkgmgr/bin/pkgmgr
'''
runner = FakeRunner(mapping={
"nix profile remove pkgmgr": FakeRunResult(0, "", ""),
})
retry = FakeRetry(results=[FakeRunResult(0, "", "")])
class FakeProfile:
def find_remove_tokens_for_store_prefixes(self, ctx, runner, prefixes):
return []
def find_remove_tokens_for_output(self, ctx, runner, output):
return ["pkgmgr"]
resolver = NixConflictResolver(runner=runner, retry=retry, profile=FakeProfile())
ok = resolver.resolve(ctx, install_cmd, stdout="", stderr=stderr, output="pkgmgr", max_rounds=2)
self.assertTrue(ok)
self.assertIn("nix profile remove pkgmgr", [c[1] for c in runner.calls])
def test_resolve_uses_textual_remove_tokens_last_resort(self) -> None:
ctx = DummyCtx()
install_cmd = "nix profile install /repo#default"
stderr = "hint: try:\n nix profile remove 'pkgmgr-1'\n"
runner = FakeRunner(mapping={
"nix profile remove pkgmgr-1": FakeRunResult(0, "", ""),
})
retry = FakeRetry(results=[FakeRunResult(0, "", "")])
class FakeProfile:
def find_remove_tokens_for_store_prefixes(self, ctx, runner, prefixes):
return []
def find_remove_tokens_for_output(self, ctx, runner, output):
return []
resolver = NixConflictResolver(runner=runner, retry=retry, profile=FakeProfile())
ok = resolver.resolve(ctx, install_cmd, stdout="", stderr=stderr, output="pkgmgr", max_rounds=2)
self.assertTrue(ok)
self.assertIn("nix profile remove pkgmgr-1", [c[1] for c in runner.calls])

View File

@@ -0,0 +1,62 @@
from __future__ import annotations
import json
import unittest
from pkgmgr.actions.install.installers.nix.profile import NixProfileInspector
from ._fakes import FakeRunResult, FakeRunner
class TestNixProfileInspector(unittest.TestCase):
def test_list_json_accepts_raw_string(self) -> None:
payload = {"elements": {"pkgmgr-1": {"attrPath": "packages.x86_64-linux.pkgmgr"}}}
raw = json.dumps(payload)
runner = FakeRunner(default=raw)
insp = NixProfileInspector()
data = insp.list_json(ctx=None, runner=runner)
self.assertEqual(data["elements"]["pkgmgr-1"]["attrPath"], "packages.x86_64-linux.pkgmgr")
def test_list_json_accepts_result_object(self) -> None:
payload = {"elements": {"pkgmgr-1": {"attrPath": "packages.x86_64-linux.pkgmgr"}}}
raw = json.dumps(payload)
runner = FakeRunner(default=FakeRunResult(0, stdout=raw))
insp = NixProfileInspector()
data = insp.list_json(ctx=None, runner=runner)
self.assertEqual(data["elements"]["pkgmgr-1"]["attrPath"], "packages.x86_64-linux.pkgmgr")
def test_find_remove_tokens_for_output_includes_output_first(self) -> None:
payload = {
"elements": {
"pkgmgr-1": {"name": "pkgmgr-1", "attrPath": "packages.x86_64-linux.pkgmgr"},
"default-1": {"name": "default-1", "attrPath": "packages.x86_64-linux.default"},
}
}
raw = json.dumps(payload)
runner = FakeRunner(default=FakeRunResult(0, stdout=raw))
insp = NixProfileInspector()
tokens = insp.find_remove_tokens_for_output(ctx=None, runner=runner, output="pkgmgr")
self.assertEqual(tokens[0], "pkgmgr")
self.assertIn("pkgmgr-1", tokens)
def test_find_remove_tokens_for_store_prefixes(self) -> None:
payload = {
"elements": {
"pkgmgr-1": {
"name": "pkgmgr-1",
"attrPath": "packages.x86_64-linux.pkgmgr",
"storePaths": ["/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-pkgmgr"],
},
"something": {
"name": "other",
"attrPath": "packages.x86_64-linux.other",
"storePaths": ["/nix/store/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb-other"],
},
}
}
raw = json.dumps(payload)
runner = FakeRunner(default=FakeRunResult(0, stdout=raw))
insp = NixProfileInspector()
tokens = insp.find_remove_tokens_for_store_prefixes(
ctx=None, runner=runner, prefixes=["/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-pkgmgr"]
)
self.assertIn("pkgmgr-1", tokens)

View File

@@ -0,0 +1,88 @@
from __future__ import annotations
import unittest
from unittest.mock import MagicMock
from pkgmgr.actions.install.installers.nix.installer import NixFlakeInstaller
from ._fakes import FakeRunResult
class DummyCtx:
def __init__(self, identifier: str = "x", repo_dir: str = "/repo", quiet: bool = True, force_update: bool = False):
self.identifier = identifier
self.repo_dir = repo_dir
self.quiet = quiet
self.force_update = force_update
class TestNixFlakeInstallerCore(unittest.TestCase):
def test_install_only_success_returns(self) -> None:
ins = NixFlakeInstaller()
ins.supports = MagicMock(return_value=True)
ins._retry = MagicMock()
ins._retry.run_with_retry.return_value = FakeRunResult(0, "", "")
ins._conflicts = MagicMock()
ins._profile = MagicMock()
ins._runner = MagicMock()
ctx = DummyCtx(identifier="lib", repo_dir="/repo", quiet=True)
ins.run(ctx)
ins._retry.run_with_retry.assert_called()
def test_conflict_resolver_success_short_circuits(self) -> None:
ins = NixFlakeInstaller()
ins.supports = MagicMock(return_value=True)
ins._retry = MagicMock()
ins._retry.run_with_retry.return_value = FakeRunResult(1, "out", "err")
ins._conflicts = MagicMock()
ins._conflicts.resolve.return_value = True
ins._profile = MagicMock()
ins._runner = MagicMock()
ctx = DummyCtx(identifier="lib", repo_dir="/repo", quiet=True)
ins.run(ctx)
ins._conflicts.resolve.assert_called()
def test_mandatory_failure_raises_systemexit(self) -> None:
ins = NixFlakeInstaller()
ins.supports = MagicMock(return_value=True)
ins._retry = MagicMock()
ins._retry.run_with_retry.return_value = FakeRunResult(2, "", "no")
ins._conflicts = MagicMock()
ins._conflicts.resolve.return_value = False
ins._profile = MagicMock()
ins._profile.find_installed_indices_for_output.return_value = []
ins._runner = MagicMock()
ins._runner.run.return_value = FakeRunResult(2, "", "")
ctx = DummyCtx(identifier="lib", repo_dir="/repo", quiet=True)
with self.assertRaises(SystemExit) as cm:
ins.run(ctx)
self.assertEqual(cm.exception.code, 2)
def test_optional_failure_does_not_raise(self) -> None:
ins = NixFlakeInstaller()
ins.supports = MagicMock(return_value=True)
results = [
FakeRunResult(0, "", ""),
FakeRunResult(2, "", ""),
]
def run_with_retry(ctx, runner, cmd):
return results.pop(0)
ins._retry = MagicMock()
ins._retry.run_with_retry.side_effect = run_with_retry
ins._conflicts = MagicMock()
ins._conflicts.resolve.return_value = False
ins._profile = MagicMock()
ins._profile.find_installed_indices_for_output.return_value = []
ins._runner = MagicMock()
ins._runner.run.return_value = FakeRunResult(2, "", "")
ctx = DummyCtx(identifier="pkgmgr", repo_dir="/repo", quiet=True)
ins.run(ctx) # must not raise

View File

@@ -115,105 +115,7 @@ class TestNixFlakeInstaller(unittest.TestCase):
install_cmds = self._install_cmds_from_calls(subproc_mock.call_args_list)
self.assertEqual(install_cmds, [f"nix profile install {self.repo_dir}#default"])
def test_nix_flake_run_mandatory_failure_raises(self) -> None:
"""
For a generic repository, 'default' is mandatory.
A non-zero return code must raise SystemExit with that code.
"""
ctx = DummyCtx(identifier="some-lib", repo_dir=self.repo_dir)
installer = NixFlakeInstaller()
# retry layer does one attempt (non-403), then fallback does final attempt => 2 installs
install_results = [self._cp(1), self._cp(1)]
def fake_subprocess_run(cmd, *args, **kwargs):
if isinstance(cmd, str) and cmd.startswith("nix profile list --json"):
return self._cp(0, stdout='{"elements": []}', stderr="")
if isinstance(cmd, str) and cmd.startswith("nix profile install "):
return install_results.pop(0)
return self._cp(0)
buf = io.StringIO()
with patch("pkgmgr.actions.install.installers.nix.installer.shutil.which") as which_mock, patch(
"pkgmgr.actions.install.installers.nix.installer.os.path.exists", return_value=True
), patch(
"pkgmgr.actions.install.installers.nix.runner.subprocess.run", side_effect=fake_subprocess_run
) as subproc_mock, redirect_stdout(buf):
self._enable_nix_in_module(which_mock)
self.assertTrue(installer.supports(ctx))
with self.assertRaises(SystemExit) as cm:
installer.run(ctx)
self.assertEqual(cm.exception.code, 1)
out = buf.getvalue()
self.assertIn("[nix] install: nix profile install", out)
self.assertIn("[ERROR] Failed to install Nix flake output 'default' (exit 1)", out)
install_cmds = self._install_cmds_from_calls(subproc_mock.call_args_list)
self.assertEqual(
install_cmds,
[
f"nix profile install {self.repo_dir}#default",
f"nix profile install {self.repo_dir}#default",
],
)
def test_nix_flake_run_optional_failure_does_not_raise(self) -> None:
"""
For pkgmgr/package-manager repositories:
- 'pkgmgr' output is mandatory
- 'default' output is optional
Failure of optional output must not raise.
"""
ctx = DummyCtx(identifier="pkgmgr", repo_dir=self.repo_dir)
installer = NixFlakeInstaller()
# pkgmgr success (1 call), default fails (2 calls: attempt + final)
install_results = [self._cp(0), self._cp(1), self._cp(1)]
def fake_subprocess_run(cmd, *args, **kwargs):
if isinstance(cmd, str) and cmd.startswith("nix profile list --json"):
return self._cp(0, stdout='{"elements": []}', stderr="")
if isinstance(cmd, str) and cmd.startswith("nix profile install "):
return install_results.pop(0)
return self._cp(0)
buf = io.StringIO()
with patch("pkgmgr.actions.install.installers.nix.installer.shutil.which") as which_mock, patch(
"pkgmgr.actions.install.installers.nix.installer.os.path.exists", return_value=True
), patch(
"pkgmgr.actions.install.installers.nix.runner.subprocess.run", side_effect=fake_subprocess_run
) as subproc_mock, redirect_stdout(buf):
self._enable_nix_in_module(which_mock)
self.assertTrue(installer.supports(ctx))
installer.run(ctx) # must NOT raise
out = buf.getvalue()
# Should announce both outputs
self.assertIn("ensuring outputs: pkgmgr, default", out)
# First output ok
self.assertIn("[nix] output 'pkgmgr' successfully installed.", out)
# Second output failed but no raise
self.assertIn("[ERROR] Failed to install Nix flake output 'default' (exit 1)", out)
self.assertIn("[WARNING] Continuing despite failure of optional output 'default'.", out)
install_cmds = self._install_cmds_from_calls(subproc_mock.call_args_list)
self.assertEqual(
install_cmds,
[
f"nix profile install {self.repo_dir}#pkgmgr",
f"nix profile install {self.repo_dir}#default",
f"nix profile install {self.repo_dir}#default",
],
)
def test_nix_flake_supports_respects_disable_env(self) -> None:
"""
PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 must disable the installer,

View File

@@ -0,0 +1,37 @@
from __future__ import annotations
import unittest
from pkgmgr.actions.install.installers.nix.profile.models import NixProfileEntry
from pkgmgr.actions.install.installers.nix.profile.matcher import entry_matches_output, entry_matches_store_path
class TestMatcher(unittest.TestCase):
def _e(self, name: str, attr: str) -> NixProfileEntry:
return NixProfileEntry(
key="pkgmgr-1",
index=None,
name=name,
attr_path=attr,
store_paths=["/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-pkgmgr"],
)
def test_matches_direct_name(self) -> None:
self.assertTrue(entry_matches_output(self._e("pkgmgr", ""), "pkgmgr"))
def test_matches_attrpath_hash(self) -> None:
self.assertTrue(entry_matches_output(self._e("", "github:me/repo#pkgmgr"), "pkgmgr"))
def test_matches_attrpath_dot_suffix(self) -> None:
self.assertTrue(entry_matches_output(self._e("", "packages.x86_64-linux.pkgmgr"), "pkgmgr"))
def test_matches_name_with_suffix_number(self) -> None:
self.assertTrue(entry_matches_output(self._e("pkgmgr-1", ""), "pkgmgr"))
def test_package_manager_special_case(self) -> None:
self.assertTrue(entry_matches_output(self._e("package-manager-2", ""), "pkgmgr"))
def test_store_path_match(self) -> None:
entry = self._e("pkgmgr-1", "")
self.assertTrue(entry_matches_store_path(entry, "/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-pkgmgr"))
self.assertFalse(entry_matches_store_path(entry, "/nix/store/cccccccccccccccccccccccccccccccc-zzz"))

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
import unittest
from pkgmgr.actions.install.installers.nix.profile.normalizer import coerce_index, normalize_elements
class TestNormalizer(unittest.TestCase):
def test_coerce_index_numeric_key(self) -> None:
self.assertEqual(coerce_index("3", {"name": "x"}), 3)
def test_coerce_index_explicit_field(self) -> None:
self.assertEqual(coerce_index("pkgmgr-1", {"index": 7}), 7)
self.assertEqual(coerce_index("pkgmgr-1", {"id": "8"}), 8)
def test_coerce_index_trailing_number(self) -> None:
self.assertEqual(coerce_index("pkgmgr-42", {"name": "x"}), 42)
def test_normalize_elements_handles_missing_elements(self) -> None:
self.assertEqual(normalize_elements({}), [])
def test_normalize_elements_collects_store_paths(self) -> None:
data = {
"elements": {
"pkgmgr-1": {
"name": "pkgmgr-1",
"attrPath": "packages.x86_64-linux.pkgmgr",
"storePaths": ["/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-pkgmgr"],
},
"2": {
"name": "foo",
"attrPath": "packages.x86_64-linux.default",
"storePath": "/nix/store/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb-foo",
},
}
}
entries = normalize_elements(data)
self.assertEqual(len(entries), 2)
self.assertTrue(entries[0].store_paths)

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
import json
import unittest
from pkgmgr.actions.install.installers.nix.profile.parser import parse_profile_list_json
class TestParseProfileListJson(unittest.TestCase):
def test_parses_valid_json(self) -> None:
payload = {"elements": {"0": {"name": "pkgmgr"}}}
raw = json.dumps(payload)
self.assertEqual(parse_profile_list_json(raw)["elements"]["0"]["name"], "pkgmgr")
def test_raises_systemexit_on_invalid_json(self) -> None:
with self.assertRaises(SystemExit) as cm:
parse_profile_list_json("{not json")
self.assertIn("Failed to parse", str(cm.exception))

View File

@@ -0,0 +1,29 @@
from __future__ import annotations
import unittest
from pkgmgr.actions.install.installers.nix.profile_list import NixProfileListReader
from ._fakes import FakeRunResult, FakeRunner
class TestNixProfileListReader(unittest.TestCase):
def test_entries_parses_indices_and_store_prefixes(self) -> None:
out = '''
0 something /nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-pkgmgr
1 something /nix/store/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb-foo
'''
runner = FakeRunner(default=FakeRunResult(0, stdout=out))
reader = NixProfileListReader(runner=runner)
entries = reader.entries(ctx=None)
self.assertEqual(entries[0][0], 0)
self.assertTrue(entries[0][1].startswith("/nix/store/"))
def test_indices_matching_store_prefixes(self) -> None:
out = " 7 x /nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-pkgmgr\n"
runner = FakeRunner(default=FakeRunResult(0, stdout=out))
reader = NixProfileListReader(runner=runner)
hits = reader.indices_matching_store_prefixes(
ctx=None,
prefixes=["/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-pkgmgr"],
)
self.assertEqual(hits, [7])

View File

@@ -0,0 +1,29 @@
from __future__ import annotations
import unittest
from pkgmgr.actions.install.installers.nix.profile.result import extract_stdout_text
class TestExtractStdoutText(unittest.TestCase):
def test_accepts_string(self) -> None:
self.assertEqual(extract_stdout_text("hello"), "hello")
def test_accepts_bytes(self) -> None:
self.assertEqual(extract_stdout_text(b"hi"), "hi")
def test_accepts_object_with_stdout_str(self) -> None:
class R:
stdout = "ok"
self.assertEqual(extract_stdout_text(R()), "ok")
def test_accepts_object_with_stdout_bytes(self) -> None:
class R:
stdout = b"ok"
self.assertEqual(extract_stdout_text(R()), "ok")
def test_fallback_str(self) -> None:
class R:
def __str__(self) -> str:
return "repr"
self.assertEqual(extract_stdout_text(R()), "repr")

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
import unittest
from pkgmgr.actions.install.installers.nix.textparse import NixConflictTextParser
class TestNixConflictTextParser(unittest.TestCase):
def test_remove_tokens_parses_unquoted_and_quoted(self) -> None:
t = NixConflictTextParser()
text = '''
nix profile remove pkgmgr
nix profile remove 'pkgmgr-1'
nix profile remove "default-2"
'''
tokens = t.remove_tokens(text)
self.assertEqual(tokens, ["pkgmgr", "pkgmgr-1", "default-2"])
def test_existing_store_prefixes_extracts_existing_section_only(self) -> None:
t = NixConflictTextParser()
text = '''
error: An existing package already provides the following file:
/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-pkgmgr/bin/pkgmgr
/nix/store/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb-pkgmgr/share/doc
This is the conflicting file from the new package:
/nix/store/cccccccccccccccccccccccccccccccc-pkgmgr/bin/pkgmgr
'''
prefixes = t.existing_store_prefixes(text)
self.assertEqual(len(prefixes), 2)
self.assertTrue(prefixes[0].startswith("/nix/store/"))

View File

@@ -0,0 +1 @@
# Unit test package for pkgmgr.actions.mirror

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.actions.mirror.context import build_context
class TestMirrorContext(unittest.TestCase):
"""
Unit tests for building RepoMirrorContext from repo + filesystem.
"""
@patch("pkgmgr.actions.mirror.context.read_mirrors_file")
@patch("pkgmgr.actions.mirror.context.load_config_mirrors")
@patch("pkgmgr.actions.mirror.context.get_repo_dir")
@patch("pkgmgr.actions.mirror.context.get_repo_identifier")
def test_build_context_bundles_config_and_file_mirrors(
self,
mock_identifier,
mock_repo_dir,
mock_load_config,
mock_read_file,
) -> None:
mock_identifier.return_value = "id"
mock_repo_dir.return_value = "/tmp/repo"
mock_load_config.return_value = {"origin": "git@github.com:alice/repo.git"}
mock_read_file.return_value = {"backup": "ssh://git@backup/alice/repo.git"}
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
ctx = build_context(repo, repositories_base_dir="/base", all_repos=[repo])
self.assertEqual(ctx.identifier, "id")
self.assertEqual(ctx.repo_dir, "/tmp/repo")
self.assertEqual(ctx.config_mirrors, {"origin": "git@github.com:alice/repo.git"})
self.assertEqual(ctx.file_mirrors, {"backup": "ssh://git@backup/alice/repo.git"})
self.assertEqual(
ctx.resolved_mirrors,
{
"origin": "git@github.com:alice/repo.git",
"backup": "ssh://git@backup/alice/repo.git",
},
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import io
import unittest
from contextlib import redirect_stdout
from unittest.mock import MagicMock, PropertyMock, patch
from pkgmgr.actions.mirror.diff_cmd import diff_mirrors
class TestDiffCmd(unittest.TestCase):
"""
Unit tests for mirror diff output.
"""
@patch("pkgmgr.actions.mirror.diff_cmd.build_context")
def test_diff_mirrors_reports_only_in_config_and_only_in_file(self, mock_build_context) -> None:
ctx = MagicMock()
ctx.identifier = "id"
ctx.repo_dir = "/tmp/repo"
ctx.config_mirrors = {"origin": "a", "cfgonly": "b"}
ctx.file_mirrors = {"origin": "a", "fileonly": "c"}
type(ctx).resolved_mirrors = PropertyMock(
return_value={"origin": "a", "cfgonly": "b", "fileonly": "c"}
)
mock_build_context.return_value = ctx
buf = io.StringIO()
with redirect_stdout(buf):
diff_mirrors(selected_repos=[{}], repositories_base_dir="/base", all_repos=[])
out = buf.getvalue()
self.assertIn("[ONLY IN CONFIG] cfgonly: b", out)
self.assertIn("[ONLY IN FILE] fileonly: c", out)
@patch("pkgmgr.actions.mirror.diff_cmd.build_context")
def test_diff_mirrors_reports_url_mismatch(self, mock_build_context) -> None:
ctx = MagicMock()
ctx.identifier = "id"
ctx.repo_dir = "/tmp/repo"
ctx.config_mirrors = {"origin": "a"}
ctx.file_mirrors = {"origin": "different"}
type(ctx).resolved_mirrors = PropertyMock(return_value={"origin": "different"})
mock_build_context.return_value = ctx
buf = io.StringIO()
with redirect_stdout(buf):
diff_mirrors(selected_repos=[{}], repositories_base_dir="/base", all_repos=[])
out = buf.getvalue()
self.assertIn("[URL MISMATCH]", out)
self.assertIn("config: a", out)
self.assertIn("file: different", out)
@patch("pkgmgr.actions.mirror.diff_cmd.build_context")
def test_diff_mirrors_reports_in_sync(self, mock_build_context) -> None:
ctx = MagicMock()
ctx.identifier = "id"
ctx.repo_dir = "/tmp/repo"
ctx.config_mirrors = {"origin": "a"}
ctx.file_mirrors = {"origin": "a"}
type(ctx).resolved_mirrors = PropertyMock(return_value={"origin": "a"})
mock_build_context.return_value = ctx
buf = io.StringIO()
with redirect_stdout(buf):
diff_mirrors(selected_repos=[{}], repositories_base_dir="/base", all_repos=[])
out = buf.getvalue()
self.assertIn("[OK] Mirrors in config and MIRRORS file are in sync.", out)
if __name__ == "__main__":
unittest.main()

View File

@@ -4,10 +4,13 @@
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.actions.mirror.git_remote import (
build_default_ssh_url,
determine_primary_remote_url,
current_origin_url,
has_origin_remote,
)
from pkgmgr.actions.mirror.types import MirrorMap, Repository
@@ -25,10 +28,7 @@ class TestMirrorGitRemote(unittest.TestCase):
}
url = build_default_ssh_url(repo)
self.assertEqual(
url,
"git@github.com:kevinveenbirkenbach/package-manager.git",
)
self.assertEqual(url, "git@github.com:kevinveenbirkenbach/package-manager.git")
def test_build_default_ssh_url_with_port(self) -> None:
repo: Repository = {
@@ -39,24 +39,18 @@ class TestMirrorGitRemote(unittest.TestCase):
}
url = build_default_ssh_url(repo)
self.assertEqual(
url,
"ssh://git@code.cymais.cloud:2201/kevinveenbirkenbach/pkgmgr.git",
)
self.assertEqual(url, "ssh://git@code.cymais.cloud:2201/kevinveenbirkenbach/pkgmgr.git")
def test_build_default_ssh_url_missing_fields_returns_none(self) -> None:
repo: Repository = {
"provider": "github.com",
"account": "kevinveenbirkenbach",
# "repository" fehlt absichtlich
}
url = build_default_ssh_url(repo)
self.assertIsNone(url)
def test_determine_primary_remote_url_prefers_origin_in_resolved_mirrors(
self,
) -> None:
def test_determine_primary_remote_url_prefers_origin_in_resolved_mirrors(self) -> None:
repo: Repository = {
"provider": "github.com",
"account": "kevinveenbirkenbach",
@@ -68,10 +62,7 @@ class TestMirrorGitRemote(unittest.TestCase):
}
url = determine_primary_remote_url(repo, mirrors)
self.assertEqual(
url,
"git@github.com:kevinveenbirkenbach/package-manager.git",
)
self.assertEqual(url, "git@github.com:kevinveenbirkenbach/package-manager.git")
def test_determine_primary_remote_url_uses_any_mirror_if_no_origin(self) -> None:
repo: Repository = {
@@ -85,11 +76,7 @@ class TestMirrorGitRemote(unittest.TestCase):
}
url = determine_primary_remote_url(repo, mirrors)
# Alphabetisch sortiert: backup, mirror2 → backup gewinnt
self.assertEqual(
url,
"ssh://git@git.veen.world:2201/kevinveenbirkenbach/pkgmgr.git",
)
self.assertEqual(url, "ssh://git@git.veen.world:2201/kevinveenbirkenbach/pkgmgr.git")
def test_determine_primary_remote_url_falls_back_to_default_ssh(self) -> None:
repo: Repository = {
@@ -100,10 +87,38 @@ class TestMirrorGitRemote(unittest.TestCase):
mirrors: MirrorMap = {}
url = determine_primary_remote_url(repo, mirrors)
self.assertEqual(
url,
"git@github.com:kevinveenbirkenbach/package-manager.git",
)
self.assertEqual(url, "git@github.com:kevinveenbirkenbach/package-manager.git")
@patch("pkgmgr.actions.mirror.git_remote.run_git")
def test_current_origin_url_returns_value(self, mock_run_git) -> None:
mock_run_git.return_value = "git@github.com:alice/repo.git\n"
self.assertEqual(current_origin_url("/tmp/repo"), "git@github.com:alice/repo.git")
mock_run_git.assert_called_once_with(["remote", "get-url", "origin"], cwd="/tmp/repo")
@patch("pkgmgr.actions.mirror.git_remote.run_git")
def test_current_origin_url_returns_none_on_git_error(self, mock_run_git) -> None:
from pkgmgr.core.git import GitError
mock_run_git.side_effect = GitError("fail")
self.assertIsNone(current_origin_url("/tmp/repo"))
@patch("pkgmgr.actions.mirror.git_remote.run_git")
def test_has_origin_remote_true(self, mock_run_git) -> None:
mock_run_git.return_value = "origin\nupstream\n"
self.assertTrue(has_origin_remote("/tmp/repo"))
mock_run_git.assert_called_once_with(["remote"], cwd="/tmp/repo")
@patch("pkgmgr.actions.mirror.git_remote.run_git")
def test_has_origin_remote_false_on_missing_remote(self, mock_run_git) -> None:
mock_run_git.return_value = "upstream\n"
self.assertFalse(has_origin_remote("/tmp/repo"))
@patch("pkgmgr.actions.mirror.git_remote.run_git")
def test_has_origin_remote_false_on_git_error(self, mock_run_git) -> None:
from pkgmgr.core.git import GitError
mock_run_git.side_effect = GitError("fail")
self.assertFalse(has_origin_remote("/tmp/repo"))
if __name__ == "__main__":

View File

@@ -7,10 +7,7 @@ import os
import tempfile
import unittest
from pkgmgr.actions.mirror.io import (
load_config_mirrors,
read_mirrors_file,
)
from pkgmgr.actions.mirror.io import load_config_mirrors, read_mirrors_file, write_mirrors_file
class TestMirrorIO(unittest.TestCase):
@@ -18,117 +15,96 @@ class TestMirrorIO(unittest.TestCase):
Unit tests for pkgmgr.actions.mirror.io helpers.
"""
# ------------------------------------------------------------------
# load_config_mirrors
# ------------------------------------------------------------------
def test_load_config_mirrors_from_dict(self) -> None:
def test_load_config_mirrors_from_dict_filters_empty(self) -> None:
repo = {
"mirrors": {
"origin": "ssh://git@example.com/account/repo.git",
"backup": "ssh://git@backup/account/repo.git",
"empty": "",
"none": None,
"backup": "",
"invalid": None,
}
}
mirrors = load_config_mirrors(repo)
self.assertEqual(mirrors, {"origin": "ssh://git@example.com/account/repo.git"})
self.assertEqual(
mirrors,
{
"origin": "ssh://git@example.com/account/repo.git",
"backup": "ssh://git@backup/account/repo.git",
},
)
def test_load_config_mirrors_from_list(self) -> None:
def test_load_config_mirrors_from_list_filters_invalid_entries(self) -> None:
repo = {
"mirrors": [
{"name": "origin", "url": "ssh://git@example.com/account/repo.git"},
{"name": "backup", "url": "ssh://git@backup/account/repo.git"},
{"name": "", "url": "ssh://git@invalid/ignored.git"},
{"name": "missing-url"},
"not-a-dict",
{"name": "backup", "url": ""},
{"name": "", "url": "ssh://git@example.com/empty-name.git"},
{"url": "ssh://git@example.com/missing-name.git"},
]
}
mirrors = load_config_mirrors(repo)
self.assertEqual(
mirrors,
{
"origin": "ssh://git@example.com/account/repo.git",
"backup": "ssh://git@backup/account/repo.git",
},
)
self.assertEqual(mirrors, {"origin": "ssh://git@example.com/account/repo.git"})
def test_load_config_mirrors_empty_when_missing(self) -> None:
repo = {}
mirrors = load_config_mirrors(repo)
self.assertEqual(mirrors, {})
self.assertEqual(load_config_mirrors({}), {})
# ------------------------------------------------------------------
# read_mirrors_file
# ------------------------------------------------------------------
def test_read_mirrors_file_with_named_and_url_only_entries(self) -> None:
"""
Ensure that the MIRRORS file format is parsed correctly:
- 'name url' → exact name
- 'url' → auto name derived from netloc (host[:port]),
with numeric suffix if duplicated.
"""
def test_read_mirrors_file_parses_named_entries(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
mirrors_path = os.path.join(tmpdir, "MIRRORS")
content = "\n".join(
[
"# comment",
"",
"origin ssh://git@example.com/account/repo.git",
"https://github.com/kevinveenbirkenbach/package-manager",
"https://github.com/kevinveenbirkenbach/another-repo",
"ssh://git@git.veen.world:2201/kevinveenbirkenbach/pkgmgr.git",
]
)
with open(mirrors_path, "w", encoding="utf-8") as fh:
fh.write(content + "\n")
p = os.path.join(tmpdir, "MIRRORS")
with open(p, "w", encoding="utf-8") as fh:
fh.write("origin ssh://git@example.com/account/repo.git\n")
mirrors = read_mirrors_file(tmpdir)
# 'origin' is preserved as given
self.assertIn("origin", mirrors)
self.assertEqual(
mirrors["origin"],
"ssh://git@example.com/account/repo.git",
)
self.assertEqual(mirrors, {"origin": "ssh://git@example.com/account/repo.git"})
# Two GitHub URLs → auto names: github.com, github.com2
github_urls = {
mirrors.get("github.com"),
mirrors.get("github.com2"),
}
self.assertIn(
"https://github.com/kevinveenbirkenbach/package-manager",
github_urls,
)
self.assertIn(
"https://github.com/kevinveenbirkenbach/another-repo",
github_urls,
)
def test_read_mirrors_file_url_only_uses_netloc_basename_and_suffix(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
p = os.path.join(tmpdir, "MIRRORS")
with open(p, "w", encoding="utf-8") as fh:
fh.write(
"\n".join(
[
"https://github.com/alice/repo1",
"https://github.com/alice/repo2",
"ssh://git@git.veen.world:2201/alice/repo3.git",
]
)
+ "\n"
)
mirrors = read_mirrors_file(tmpdir)
self.assertIn("github.com", mirrors)
self.assertIn("github.com2", mirrors)
self.assertEqual(mirrors["github.com"], "https://github.com/alice/repo1")
self.assertEqual(mirrors["github.com2"], "https://github.com/alice/repo2")
# SSH-URL mit User-Teil → netloc ist "git@git.veen.world:2201"
# → host = "git@git.veen.world"
self.assertIn("git@git.veen.world", mirrors)
self.assertEqual(
mirrors["git@git.veen.world"],
"ssh://git@git.veen.world:2201/kevinveenbirkenbach/pkgmgr.git",
)
self.assertEqual(mirrors["git@git.veen.world"], "ssh://git@git.veen.world:2201/alice/repo3.git")
def test_read_mirrors_file_missing_returns_empty(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
mirrors = read_mirrors_file(tmpdir) # no MIRRORS file
self.assertEqual(mirrors, {})
self.assertEqual(read_mirrors_file(tmpdir), {})
def test_write_mirrors_file_writes_sorted_lines(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
mirrors = {
"b": "ssh://b.example/repo.git",
"a": "ssh://a.example/repo.git",
}
write_mirrors_file(tmpdir, mirrors, preview=False)
p = os.path.join(tmpdir, "MIRRORS")
self.assertTrue(os.path.exists(p))
with open(p, "r", encoding="utf-8") as fh:
content = fh.read()
self.assertEqual(content, "a ssh://a.example/repo.git\nb ssh://b.example/repo.git\n")
def test_write_mirrors_file_preview_does_not_create_file(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
mirrors = {"a": "ssh://a.example/repo.git"}
write_mirrors_file(tmpdir, mirrors, preview=True)
p = os.path.join(tmpdir, "MIRRORS")
self.assertFalse(os.path.exists(p))
if __name__ == "__main__":

View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import io
import unittest
from contextlib import redirect_stdout
from unittest.mock import MagicMock, PropertyMock, patch
from pkgmgr.actions.mirror.list_cmd import list_mirrors
class TestListCmd(unittest.TestCase):
"""
Unit tests for mirror list output.
"""
@patch("pkgmgr.actions.mirror.list_cmd.build_context")
def test_list_mirrors_all_sources_prints_sections(self, mock_build_context) -> None:
ctx = MagicMock()
ctx.identifier = "id"
ctx.repo_dir = "/tmp/repo"
ctx.config_mirrors = {"origin": "a"}
ctx.file_mirrors = {"backup": "b"}
type(ctx).resolved_mirrors = PropertyMock(return_value={"origin": "a", "backup": "b"})
mock_build_context.return_value = ctx
buf = io.StringIO()
with redirect_stdout(buf):
list_mirrors(
selected_repos=[{}],
repositories_base_dir="/base",
all_repos=[],
source="all",
)
out = buf.getvalue()
self.assertIn("[config mirrors]", out)
self.assertIn("[MIRRORS file]", out)
self.assertIn("[resolved mirrors]", out)
self.assertIn("origin: a", out)
self.assertIn("backup: b", out)
@patch("pkgmgr.actions.mirror.list_cmd.build_context")
def test_list_mirrors_config_only(self, mock_build_context) -> None:
ctx = MagicMock()
ctx.identifier = "id"
ctx.repo_dir = "/tmp/repo"
ctx.config_mirrors = {"origin": "a"}
ctx.file_mirrors = {"backup": "b"}
type(ctx).resolved_mirrors = PropertyMock(return_value={"origin": "a", "backup": "b"})
mock_build_context.return_value = ctx
buf = io.StringIO()
with redirect_stdout(buf):
list_mirrors(
selected_repos=[{}],
repositories_base_dir="/base",
all_repos=[],
source="config",
)
out = buf.getvalue()
self.assertIn("[config mirrors]", out)
self.assertIn("origin: a", out)
self.assertNotIn("[MIRRORS file]", out)
self.assertNotIn("[resolved mirrors]", out)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.actions.mirror.remote_check import probe_mirror
from pkgmgr.core.git import GitError
class TestRemoteCheck(unittest.TestCase):
"""
Unit tests for non-destructive remote probing (git ls-remote).
"""
@patch("pkgmgr.actions.mirror.remote_check.run_git")
def test_probe_mirror_success_returns_true_and_empty_message(self, mock_run_git) -> None:
mock_run_git.return_value = "dummy-output"
ok, message = probe_mirror(
"ssh://git@code.example.org:2201/alice/repo.git",
"/tmp/some-repo",
)
self.assertTrue(ok)
self.assertEqual(message, "")
mock_run_git.assert_called_once_with(
["ls-remote", "ssh://git@code.example.org:2201/alice/repo.git"],
cwd="/tmp/some-repo",
)
@patch("pkgmgr.actions.mirror.remote_check.run_git")
def test_probe_mirror_failure_returns_false_and_error_message(self, mock_run_git) -> None:
mock_run_git.side_effect = GitError("Git command failed (simulated)")
ok, message = probe_mirror(
"ssh://git@code.example.org:2201/alice/repo.git",
"/tmp/some-repo",
)
self.assertFalse(ok)
self.assertIn("Git command failed", message)
mock_run_git.assert_called_once_with(
["ls-remote", "ssh://git@code.example.org:2201/alice/repo.git"],
cwd="/tmp/some-repo",
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import unittest
from unittest.mock import MagicMock, PropertyMock, patch
from pkgmgr.actions.mirror.remote_provision import ensure_remote_repository
class TestRemoteProvision(unittest.TestCase):
"""
Unit tests for remote provisioning wrapper logic (action layer).
"""
@patch("pkgmgr.actions.mirror.remote_provision.ensure_remote_repo")
@patch("pkgmgr.actions.mirror.remote_provision.determine_primary_remote_url")
@patch("pkgmgr.actions.mirror.remote_provision.build_context")
def test_ensure_remote_repository_builds_spec_from_url_and_calls_core(
self,
mock_build_context,
mock_determine_primary,
mock_ensure_remote_repo,
) -> None:
ctx = MagicMock()
type(ctx).resolved_mirrors = PropertyMock(
return_value={"origin": "ssh://git@git.veen.world:2201/alice/repo.git"}
)
ctx.identifier = "repo-id"
mock_build_context.return_value = ctx
mock_determine_primary.return_value = "ssh://git@git.veen.world:2201/alice/repo.git"
result = MagicMock()
result.status = "created"
result.message = "Repository created (user)."
result.url = "https://git.veen.world/alice/repo"
mock_ensure_remote_repo.return_value = result
repo = {
"provider": "gitea",
"account": "SHOULD_NOT_BE_USED_ANYMORE",
"repository": "SHOULD_NOT_BE_USED_ANYMORE",
"private": True,
"description": "desc",
}
ensure_remote_repository(
repo=repo,
repositories_base_dir="/base",
all_repos=[],
preview=False,
)
self.assertTrue(mock_ensure_remote_repo.called)
called_spec = mock_ensure_remote_repo.call_args[0][0]
self.assertEqual(called_spec.host, "git.veen.world")
self.assertEqual(called_spec.owner, "alice")
self.assertEqual(called_spec.name, "repo")
@patch("pkgmgr.actions.mirror.remote_provision.ensure_remote_repo")
@patch("pkgmgr.actions.mirror.remote_provision.determine_primary_remote_url")
@patch("pkgmgr.actions.mirror.remote_provision.build_context")
def test_ensure_remote_repository_skips_when_no_primary_url(
self,
mock_build_context,
mock_determine_primary,
mock_ensure_remote_repo,
) -> None:
ctx = MagicMock()
type(ctx).resolved_mirrors = PropertyMock(return_value={})
ctx.identifier = "repo-id"
mock_build_context.return_value = ctx
mock_determine_primary.return_value = None
ensure_remote_repository(
repo={"provider": "gitea"},
repositories_base_dir="/base",
all_repos=[],
preview=False,
)
mock_ensure_remote_repo.assert_not_called()
@patch("pkgmgr.actions.mirror.remote_provision.ensure_remote_repo")
@patch("pkgmgr.actions.mirror.remote_provision.determine_primary_remote_url")
@patch("pkgmgr.actions.mirror.remote_provision.build_context")
def test_ensure_remote_repository_skips_when_url_not_parseable(
self,
mock_build_context,
mock_determine_primary,
mock_ensure_remote_repo,
) -> None:
ctx = MagicMock()
type(ctx).resolved_mirrors = PropertyMock(
return_value={"origin": "ssh://git@host:2201/not-enough-parts"}
)
ctx.identifier = "repo-id"
mock_build_context.return_value = ctx
mock_determine_primary.return_value = "ssh://git@host:2201/not-enough-parts"
ensure_remote_repository(
repo={"provider": "gitea"},
repositories_base_dir="/base",
all_repos=[],
preview=False,
)
mock_ensure_remote_repo.assert_not_called()
if __name__ == "__main__":
unittest.main()

View File

@@ -4,55 +4,120 @@
from __future__ import annotations
import unittest
from unittest.mock import patch
from unittest.mock import MagicMock, PropertyMock, patch
from pkgmgr.actions.mirror.setup_cmd import _probe_mirror
from pkgmgr.core.git import GitError
from pkgmgr.actions.mirror.setup_cmd import setup_mirrors
class TestMirrorSetupCmd(unittest.TestCase):
"""
Unit tests for the non-destructive remote probing logic in setup_cmd.
Unit tests for mirror setup orchestration (local + remote).
"""
@patch("pkgmgr.actions.mirror.setup_cmd.run_git")
def test_probe_mirror_success_returns_true_and_empty_message(
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_origin_remote")
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
def test_setup_mirrors_local_calls_ensure_origin_remote(
self,
mock_run_git,
mock_build_context,
mock_ensure_origin,
) -> None:
"""
If run_git returns successfully, _probe_mirror must report (True, "").
"""
mock_run_git.return_value = "dummy-output"
ctx = MagicMock()
ctx.identifier = "repo-id"
ctx.repo_dir = "/tmp/repo"
ctx.config_mirrors = {}
ctx.file_mirrors = {}
type(ctx).resolved_mirrors = PropertyMock(return_value={})
mock_build_context.return_value = ctx
ok, message = _probe_mirror(
"ssh://git@code.cymais.cloud:2201/kevinveenbirkenbach/pkgmgr.git",
"/tmp/some-repo",
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
setup_mirrors(
selected_repos=[repo],
repositories_base_dir="/base",
all_repos=[repo],
preview=True,
local=True,
remote=False,
ensure_remote=False,
)
self.assertTrue(ok)
self.assertEqual(message, "")
mock_run_git.assert_called_once()
mock_ensure_origin.assert_called_once()
args, kwargs = mock_ensure_origin.call_args
self.assertEqual(args[0], repo)
self.assertEqual(kwargs.get("preview"), True)
@patch("pkgmgr.actions.mirror.setup_cmd.run_git")
def test_probe_mirror_failure_returns_false_and_error_message(
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_remote_repository")
@patch("pkgmgr.actions.mirror.setup_cmd.probe_mirror")
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
def test_setup_mirrors_remote_provisions_when_enabled(
self,
mock_run_git,
mock_build_context,
mock_probe,
mock_ensure_remote_repository,
) -> None:
"""
If run_git raises GitError, _probe_mirror must report (False, <message>),
and not re-raise the exception.
"""
mock_run_git.side_effect = GitError("Git command failed (simulated)")
ctx = MagicMock()
ctx.identifier = "repo-id"
ctx.repo_dir = "/tmp/repo"
ctx.config_mirrors = {"origin": "git@github.com:alice/repo.git"}
ctx.file_mirrors = {}
type(ctx).resolved_mirrors = PropertyMock(return_value={"origin": "git@github.com:alice/repo.git"})
mock_build_context.return_value = ctx
ok, message = _probe_mirror(
"ssh://git@code.cymais.cloud:2201/kevinveenbirkenbach/pkgmgr.git",
"/tmp/some-repo",
mock_probe.return_value = (True, "")
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
setup_mirrors(
selected_repos=[repo],
repositories_base_dir="/base",
all_repos=[repo],
preview=False,
local=False,
remote=True,
ensure_remote=True,
)
self.assertFalse(ok)
self.assertIn("Git command failed", message)
mock_run_git.assert_called_once()
mock_ensure_remote_repository.assert_called_once()
mock_probe.assert_called_once()
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_remote_repository")
@patch("pkgmgr.actions.mirror.setup_cmd.probe_mirror")
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
def test_setup_mirrors_remote_probes_all_resolved_mirrors(
self,
mock_build_context,
mock_probe,
mock_ensure_remote_repository,
) -> None:
ctx = MagicMock()
ctx.identifier = "repo-id"
ctx.repo_dir = "/tmp/repo"
ctx.config_mirrors = {}
ctx.file_mirrors = {}
type(ctx).resolved_mirrors = PropertyMock(
return_value={
"mirror": "git@github.com:alice/repo.git",
"backup": "ssh://git@git.veen.world:2201/alice/repo.git",
}
)
mock_build_context.return_value = ctx
mock_probe.return_value = (True, "")
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
setup_mirrors(
selected_repos=[repo],
repositories_base_dir="/base",
all_repos=[repo],
preview=False,
local=False,
remote=True,
ensure_remote=False,
)
mock_ensure_remote_repository.assert_not_called()
self.assertEqual(mock_probe.call_count, 2)
if __name__ == "__main__":

View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import unittest
from pkgmgr.actions.mirror.url_utils import hostport_from_git_url, normalize_provider_host, parse_repo_from_git_url
class TestUrlUtils(unittest.TestCase):
"""
Unit tests for URL parsing helpers used in mirror setup/provisioning.
"""
def test_hostport_from_git_url_ssh_url_with_port(self) -> None:
host, port = hostport_from_git_url("ssh://git@code.example.org:2201/alice/repo.git")
self.assertEqual(host, "code.example.org")
self.assertEqual(port, "2201")
def test_hostport_from_git_url_https_url_no_port(self) -> None:
host, port = hostport_from_git_url("https://github.com/alice/repo.git")
self.assertEqual(host, "github.com")
self.assertIsNone(port)
def test_hostport_from_git_url_scp_like(self) -> None:
host, port = hostport_from_git_url("git@github.com:alice/repo.git")
self.assertEqual(host, "github.com")
self.assertIsNone(port)
def test_hostport_from_git_url_empty(self) -> None:
host, port = hostport_from_git_url("")
self.assertEqual(host, "")
self.assertIsNone(port)
def test_normalize_provider_host_strips_port_and_lowercases(self) -> None:
self.assertEqual(normalize_provider_host("GIT.VEEN.WORLD:2201"), "git.veen.world")
def test_normalize_provider_host_ipv6_brackets(self) -> None:
self.assertEqual(normalize_provider_host("[::1]"), "::1")
def test_normalize_provider_host_empty(self) -> None:
self.assertEqual(normalize_provider_host(""), "")
def test_parse_repo_from_git_url_ssh_url(self) -> None:
host, owner, name = parse_repo_from_git_url("ssh://git@code.example.org:2201/alice/repo.git")
self.assertEqual(host, "code.example.org")
self.assertEqual(owner, "alice")
self.assertEqual(name, "repo")
def test_parse_repo_from_git_url_https_url(self) -> None:
host, owner, name = parse_repo_from_git_url("https://github.com/alice/repo.git")
self.assertEqual(host, "github.com")
self.assertEqual(owner, "alice")
self.assertEqual(name, "repo")
def test_parse_repo_from_git_url_scp_like(self) -> None:
host, owner, name = parse_repo_from_git_url("git@github.com:alice/repo.git")
self.assertEqual(host, "github.com")
self.assertEqual(owner, "alice")
self.assertEqual(name, "repo")
def test_parse_repo_from_git_url_best_effort_host_owner_repo(self) -> None:
host, owner, name = parse_repo_from_git_url("git.veen.world/alice/repo.git")
self.assertEqual(host, "git.veen.world")
self.assertEqual(owner, "alice")
self.assertEqual(name, "repo")
def test_parse_repo_from_git_url_missing_owner_repo_returns_none(self) -> None:
host, owner, name = parse_repo_from_git_url("https://github.com/")
self.assertEqual(host, "github.com")
self.assertIsNone(owner)
self.assertIsNone(name)
if __name__ == "__main__":
unittest.main()