Compare commits

..

11 Commits

Author SHA1 Message Date
Kevin Veen-Birkenbach
a20814cb37 Release version 1.8.7
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-19 14:15:47 +01:00
Kevin Veen-Birkenbach
feb5ba267f refactor(release): move file helpers into files package
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
https://chatgpt.com/share/69454ef4-e038-800f-a14b-4e633e76f241
2025-12-19 14:11:04 +01:00
Kevin Veen-Birkenbach
591be4ef35 test(release): update pyproject version tests for PEP 621 and RuntimeError handling
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Adjust tests to expect RuntimeError instead of SystemExit
- Add coverage for missing [project] section in pyproject.toml
- Keep spec macro %{?dist} intact in test fixtures
- Minor cleanup and reformatting of test cases

https://chatgpt.com/share/69454836-4698-800f-9d19-7e67e8e789d6
2025-12-19 14:06:33 +01:00
Kevin Veen-Birkenbach
3e6ef0fd68 release: fix pyproject.toml version update for PEP 621 projects
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
Update version handling to correctly modify [project].version in pyproject.toml.
The previous implementation only matched top-level version assignments and
failed for PEP 621 layouts.

- Restrict update to the [project] section
- Allow leading whitespace in version lines
- Replace sys.exit() with proper exceptions
- Remove unused sys import

https://chatgpt.com/share/69454836-4698-800f-9d19-7e67e8e789d6
2025-12-19 13:42:26 +01:00
Kevin Veen-Birkenbach
3d5c770def Solved ruff F401
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-18 19:16:15 +01:00
Kevin Veen-Birkenbach
f4339a746a executet 'ruff format --check .'
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-18 14:04:44 +01:00
Kevin Veen-Birkenbach
763f02a9a4 Release version 1.8.6
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-17 23:50:31 +01:00
Kevin Veen-Birkenbach
2eec873a17 Solved Debian Bug
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
https://chatgpt.com/share/69432655-a948-800f-8c0d-353921cdf644
2025-12-17 23:29:04 +01:00
Kevin Veen-Birkenbach
17ee947930 ci: pass NIX_CONFIG with GitHub token into all test containers
- Add NIX_CONFIG with GitHub access token to all CI test workflows
- Export NIX_CONFIG in Makefile for propagation to test scripts
- Forward NIX_CONFIG explicitly into all docker run invocations
- Prevent GitHub API rate limit errors during Nix-based tests

https://chatgpt.com/share/69432655-a948-800f-8c0d-353921cdf644
2025-12-17 23:29:04 +01:00
Kevin Veen-Birkenbach
b989bdd4eb Release version 1.8.5 2025-12-17 23:29:04 +01:00
Kevin Veen-Birkenbach
c4da8368d8 --- Release Error --- 2025-12-17 23:28:45 +01:00
183 changed files with 1944 additions and 1246 deletions

View File

@@ -11,7 +11,9 @@ jobs:
fail-fast: false
matrix:
distro: [arch, debian, ubuntu, fedora, centos]
env:
NIX_CONFIG: |
access-tokens = github.com=${{ secrets.GITHUB_TOKEN }}
steps:
- name: Checkout repository
uses: actions/checkout@v4

View File

@@ -12,7 +12,9 @@ jobs:
fail-fast: false
matrix:
distro: [arch, debian, ubuntu, fedora, centos]
env:
NIX_CONFIG: |
access-tokens = github.com=${{ secrets.GITHUB_TOKEN }}
steps:
- name: Checkout repository
uses: actions/checkout@v4

View File

@@ -11,7 +11,9 @@ jobs:
fail-fast: false
matrix:
distro: [arch, debian, ubuntu, fedora, centos]
env:
NIX_CONFIG: |
access-tokens = github.com=${{ secrets.GITHUB_TOKEN }}
steps:
- name: Checkout repository
uses: actions/checkout@v4

View File

@@ -7,7 +7,9 @@ jobs:
test-integration:
runs-on: ubuntu-latest
timeout-minutes: 30
env:
NIX_CONFIG: |
access-tokens = github.com=${{ secrets.GITHUB_TOKEN }}
steps:
- name: Checkout repository
uses: actions/checkout@v4

View File

@@ -7,7 +7,9 @@ jobs:
test-unit:
runs-on: ubuntu-latest
timeout-minutes: 30
env:
NIX_CONFIG: |
access-tokens = github.com=${{ secrets.GITHUB_TOKEN }}
steps:
- name: Checkout repository
uses: actions/checkout@v4

View File

@@ -11,7 +11,9 @@ jobs:
fail-fast: false
matrix:
distro: [arch, debian, ubuntu, fedora, centos]
env:
NIX_CONFIG: |
access-tokens = github.com=${{ secrets.GITHUB_TOKEN }}
steps:
- name: Checkout repository
uses: actions/checkout@v4
@@ -19,13 +21,11 @@ jobs:
- name: Show Docker version
run: docker version
# 🔹 BUILD virgin image if missing
- name: Build virgin container (${{ matrix.distro }})
run: |
set -euo pipefail
PKGMGR_DISTRO="${{ matrix.distro }}" make build-missing-virgin
# 🔹 RUN test inside virgin image
- name: Virgin ${{ matrix.distro }} pkgmgr test (root)
run: |
set -euo pipefail
@@ -34,6 +34,7 @@ jobs:
-v "$PWD":/opt/src/pkgmgr \
-v pkgmgr_repos:/root/Repositories \
-v pkgmgr_pip_cache:/root/.cache/pip \
-e NIX_CONFIG="${NIX_CONFIG}" \
-w /opt/src/pkgmgr \
"pkgmgr-${{ matrix.distro }}-virgin" \
bash -lc '

View File

@@ -11,7 +11,9 @@ jobs:
fail-fast: false
matrix:
distro: [arch, debian, ubuntu, fedora, centos]
env:
NIX_CONFIG: |
access-tokens = github.com=${{ secrets.GITHUB_TOKEN }}
steps:
- name: Checkout repository
uses: actions/checkout@v4
@@ -19,19 +21,18 @@ jobs:
- name: Show Docker version
run: docker version
# 🔹 BUILD virgin image if missing
- name: Build virgin container (${{ matrix.distro }})
run: |
set -euo pipefail
PKGMGR_DISTRO="${{ matrix.distro }}" make build-missing-virgin
# 🔹 RUN test inside virgin image as non-root
- name: Virgin ${{ matrix.distro }} pkgmgr test (user)
run: |
set -euo pipefail
docker run --rm \
-v "$PWD":/opt/src/pkgmgr \
-e NIX_CONFIG="${NIX_CONFIG}" \
-w /opt/src/pkgmgr \
"pkgmgr-${{ matrix.distro }}-virgin" \
bash -lc '

View File

@@ -1,3 +1,15 @@
## [1.8.7] - 2025-12-19
* * **Release version updates now correctly modify ***pyproject.toml*** files that follow PEP 621**, ensuring the ***[project].version*** field is updated as expected.
* **Invalid or incomplete ***pyproject.toml*** files are now handled gracefully** with clear error messages instead of abrupt process termination.
* **RPM spec files remain compatible during releases**: existing macros such as ***%{?dist}*** are preserved and no longer accidentally modified.
## [1.8.6] - 2025-12-17
* Prevent Rate Limits during GitHub Nix Setups
## [1.8.5] - 2025-12-17
* * Clearer Git error handling, especially when a directory is not a Git repository.

View File

@@ -10,6 +10,10 @@ DISTROS ?= arch debian ubuntu fedora centos
PKGMGR_DISTRO ?= arch
export PKGMGR_DISTRO
# Nix Config Variable (To avoid rate limit)
NIX_CONFIG ?=
export NIX_CONFIG
# ------------------------------------------------------------
# Base images
# (kept for documentation/reference; actual build logic is in scripts/build)

View File

@@ -32,7 +32,7 @@
rec {
pkgmgr = pyPkgs.buildPythonApplication {
pname = "package-manager";
version = "1.8.5";
version = "1.8.7";
# Use the git repo as source
src = ./.;

View File

@@ -1,7 +1,7 @@
# Maintainer: Kevin Veen-Birkenbach <info@veen.world>
pkgname=package-manager
pkgver=1.8.5
pkgver=1.8.7
pkgrel=1
pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)."
arch=('any')

View File

@@ -1,3 +1,17 @@
package-manager (1.8.7-1) unstable; urgency=medium
* * **Release version updates now correctly modify ***pyproject.toml*** files that follow PEP 621**, ensuring the ***[project].version*** field is updated as expected.
* **Invalid or incomplete ***pyproject.toml*** files are now handled gracefully** with clear error messages instead of abrupt process termination.
* **RPM spec files remain compatible during releases**: existing macros such as ***%{?dist}*** are preserved and no longer accidentally modified.
-- Kevin Veen-Birkenbach <kevin@veen.world> Fri, 19 Dec 2025 14:15:47 +0100
package-manager (1.8.6-1) unstable; urgency=medium
* Prevent Rate Limits during GitHub Nix Setups
-- Kevin Veen-Birkenbach <kevin@veen.world> Wed, 17 Dec 2025 23:50:31 +0100
package-manager (1.8.5-1) unstable; urgency=medium
* * Clearer Git error handling, especially when a directory is not a Git repository.

View File

@@ -1,5 +1,5 @@
Name: package-manager
Version: 1.8.5
Version: 1.8.7
Release: 1%{?dist}
Summary: Wrapper that runs Kevin's package-manager via Nix flake
@@ -74,6 +74,14 @@ echo ">>> package-manager removed. Nix itself was not removed."
/usr/lib/package-manager/
%changelog
* Fri Dec 19 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.8.7-1
- * **Release version updates now correctly modify ***pyproject.toml*** files that follow PEP 621**, ensuring the ***[project].version*** field is updated as expected.
* **Invalid or incomplete ***pyproject.toml*** files are now handled gracefully** with clear error messages instead of abrupt process termination.
* **RPM spec files remain compatible during releases**: existing macros such as ***%{?dist}*** are preserved and no longer accidentally modified.
* Wed Dec 17 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.8.6-1
- Prevent Rate Limits during GitHub Nix Setups
* Wed Dec 17 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.8.5-1
- * Clearer Git error handling, especially when a directory is not a Git repository.
* More reliable repository verification with improved commit and GPG signature checks.

View File

@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "kpmx"
version = "1.8.5"
version = "1.8.7"
description = "Kevin's package-manager tool (pkgmgr)"
readme = "README.md"
requires-python = ">=3.9"

View File

@@ -11,6 +11,7 @@ docker run --rm \
-v "pkgmgr_nix_cache_${PKGMGR_DISTRO}:/root/.cache/nix" \
-e REINSTALL_PKGMGR=1 \
-e TEST_PATTERN="${TEST_PATTERN}" \
-e NIX_CONFIG="${NIX_CONFIG}" \
--workdir /opt/src/pkgmgr \
"pkgmgr-${PKGMGR_DISTRO}" \
bash -lc '

View File

@@ -14,6 +14,7 @@ docker run --rm \
-v "pkgmgr_nix_cache_${PKGMGR_DISTRO}:/root/.cache/nix" \
--workdir /opt/src/pkgmgr \
-e REINSTALL_PKGMGR=1 \
-e NIX_CONFIG="${NIX_CONFIG}" \
"${IMAGE}" \
bash -lc '
set -euo pipefail

View File

@@ -19,6 +19,7 @@ if OUTPUT=$(docker run --rm \
-e REINSTALL_PKGMGR=1 \
-v "$(pwd):/opt/src/pkgmgr" \
-w /opt/src/pkgmgr \
-e NIX_CONFIG="${NIX_CONFIG}" \
"${IMAGE}" \
bash -lc '
set -euo pipefail

View File

@@ -12,6 +12,7 @@ docker run --rm \
--workdir /opt/src/pkgmgr \
-e REINSTALL_PKGMGR=1 \
-e TEST_PATTERN="${TEST_PATTERN}" \
-e NIX_CONFIG="${NIX_CONFIG}" \
"pkgmgr-${PKGMGR_DISTRO}" \
bash -lc '
set -e;

View File

@@ -12,6 +12,7 @@ docker run --rm \
--workdir /opt/src/pkgmgr \
-e REINSTALL_PKGMGR=1 \
-e TEST_PATTERN="${TEST_PATTERN}" \
-e NIX_CONFIG="${NIX_CONFIG}" \
"pkgmgr-${PKGMGR_DISTRO}" \
bash -lc '
set -e;

View File

@@ -25,12 +25,12 @@ __all__ = ["cli"]
def __getattr__(name: str) -> Any:
"""
Lazily expose ``pkgmgr.cli`` as attribute on the top-level package.
"""
Lazily expose ``pkgmgr.cli`` as attribute on the top-level package.
This keeps ``import pkgmgr`` lightweight while still allowing
``from pkgmgr import cli`` in tests and entry points.
"""
if name == "cli":
return import_module("pkgmgr.cli")
raise AttributeError(f"module 'pkgmgr' has no attribute {name!r}")
This keeps ``import pkgmgr`` lightweight while still allowing
``from pkgmgr import cli`` in tests and entry points.
"""
if name == "cli":
return import_module("pkgmgr.cli")
raise AttributeError(f"module 'pkgmgr' has no attribute {name!r}")

View File

@@ -48,9 +48,13 @@ def close_branch(
# Confirmation
if not force:
answer = input(
f"Merge branch '{name}' into '{target_base}' and delete it afterwards? (y/N): "
).strip().lower()
answer = (
input(
f"Merge branch '{name}' into '{target_base}' and delete it afterwards? (y/N): "
)
.strip()
.lower()
)
if answer != "y":
print("Aborted closing branch.")
return

View File

@@ -41,9 +41,13 @@ def drop_branch(
# Confirmation
if not force:
answer = input(
f"Delete branch '{name}' locally and on origin? This is destructive! (y/N): "
).strip().lower()
answer = (
input(
f"Delete branch '{name}' locally and on origin? This is destructive! (y/N): "
)
.strip()
.lower()
)
if answer != "y":
print("Aborted dropping branch.")
return

View File

@@ -2,14 +2,17 @@ import yaml
import os
from pkgmgr.core.config.save import save_user_config
def interactive_add(config,USER_CONFIG_PATH:str):
def interactive_add(config, USER_CONFIG_PATH: str):
"""Interactively prompt the user to add a new repository entry to the user config."""
print("Adding a new repository configuration entry.")
new_entry = {}
new_entry["provider"] = input("Provider (e.g., github.com): ").strip()
new_entry["account"] = input("Account (e.g., yourusername): ").strip()
new_entry["repository"] = input("Repository name (e.g., mytool): ").strip()
new_entry["command"] = input("Command (optional, leave blank to auto-detect): ").strip()
new_entry["command"] = input(
"Command (optional, leave blank to auto-detect): "
).strip()
new_entry["description"] = input("Description (optional): ").strip()
new_entry["replacement"] = input("Replacement (optional): ").strip()
new_entry["alias"] = input("Alias (optional): ").strip()
@@ -25,12 +28,12 @@ def interactive_add(config,USER_CONFIG_PATH:str):
confirm = input("Add this entry to user config? (y/N): ").strip().lower()
if confirm == "y":
if os.path.exists(USER_CONFIG_PATH):
with open(USER_CONFIG_PATH, 'r') as f:
with open(USER_CONFIG_PATH, "r") as f:
user_config = yaml.safe_load(f) or {}
else:
user_config = {"repositories": []}
user_config.setdefault("repositories", [])
user_config["repositories"].append(new_entry)
save_user_config(user_config,USER_CONFIG_PATH)
save_user_config(user_config, USER_CONFIG_PATH)
else:
print("Entry not added.")

View File

@@ -107,11 +107,15 @@ def config_init(
# Already known?
if key in default_keys:
skipped += 1
print(f"[SKIP] (defaults) {provider}/{account}/{repo_name}")
print(
f"[SKIP] (defaults) {provider}/{account}/{repo_name}"
)
continue
if key in existing_keys:
skipped += 1
print(f"[SKIP] (user-config) {provider}/{account}/{repo_name}")
print(
f"[SKIP] (user-config) {provider}/{account}/{repo_name}"
)
continue
print(f"[ADD] {provider}/{account}/{repo_name}")
@@ -121,7 +125,9 @@ def config_init(
if verified_commit:
print(f"[INFO] Latest commit: {verified_commit}")
else:
print("[WARN] Could not read commit (not a git repo or no commits).")
print(
"[WARN] Could not read commit (not a git repo or no commits)."
)
entry: Dict[str, Any] = {
"provider": provider,

View File

@@ -1,6 +1,7 @@
import yaml
from pkgmgr.core.config.load import load_config
def show_config(selected_repos, user_config_path, full_config=False):
"""Display configuration for one or more repositories, or the entire merged config."""
if full_config:
@@ -8,7 +9,9 @@ def show_config(selected_repos, user_config_path, full_config=False):
print(yaml.dump(merged, default_flow_style=False))
else:
for repo in selected_repos:
identifier = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
identifier = (
f"{repo.get('provider')}/{repo.get('account')}/{repo.get('repository')}"
)
print(f"Repository: {identifier}")
for key, value in repo.items():
print(f" {key}: {value}")

View File

@@ -66,10 +66,7 @@ def _ensure_repo_dir(
repo_dir = get_repo_dir(repositories_base_dir, repo)
if not os.path.exists(repo_dir):
print(
f"Repository directory '{repo_dir}' does not exist. "
"Cloning it now..."
)
print(f"Repository directory '{repo_dir}' does not exist. Cloning it now...")
clone_repos(
[repo],
repositories_base_dir,
@@ -79,10 +76,7 @@ def _ensure_repo_dir(
clone_mode,
)
if not os.path.exists(repo_dir):
print(
f"Cloning failed for repository {identifier}. "
"Skipping installation."
)
print(f"Cloning failed for repository {identifier}. Skipping installation.")
return None
return repo_dir
@@ -115,7 +109,9 @@ def _verify_repo(
if silent:
# Non-interactive mode: continue with a warning.
print(f"[Warning] Continuing despite verification failure for {identifier} (--silent).")
print(
f"[Warning] Continuing despite verification failure for {identifier} (--silent)."
)
else:
choice = input("Continue anyway? [y/N]: ").strip().lower()
if choice != "y":
@@ -232,12 +228,16 @@ def install_repos(
code = exc.code if isinstance(exc.code, int) else str(exc.code)
failures.append((identifier, f"installer failed (exit={code})"))
if not quiet:
print(f"[Warning] install: repository {identifier} failed (exit={code}). Continuing...")
print(
f"[Warning] install: repository {identifier} failed (exit={code}). Continuing..."
)
continue
except Exception as exc:
failures.append((identifier, f"unexpected error: {exc}"))
if not quiet:
print(f"[Warning] install: repository {identifier} hit an unexpected error: {exc}. Continuing...")
print(
f"[Warning] install: repository {identifier} hit an unexpected error: {exc}. Continuing..."
)
continue
if failures and emit_summary and not quiet:

View File

@@ -14,6 +14,10 @@ from pkgmgr.actions.install.installers.python import PythonInstaller # noqa: F4
from pkgmgr.actions.install.installers.makefile import MakefileInstaller # noqa: F401
# OS-specific installers
from pkgmgr.actions.install.installers.os_packages.arch_pkgbuild import ArchPkgbuildInstaller # noqa: F401
from pkgmgr.actions.install.installers.os_packages.debian_control import DebianControlInstaller # noqa: F401
from pkgmgr.actions.install.installers.os_packages.arch_pkgbuild import (
ArchPkgbuildInstaller as ArchPkgbuildInstaller,
) # noqa: F401
from pkgmgr.actions.install.installers.os_packages.debian_control import (
DebianControlInstaller as DebianControlInstaller,
) # noqa: F401
from pkgmgr.actions.install.installers.os_packages.rpm_spec import RpmSpecInstaller # noqa: F401

View File

@@ -41,7 +41,9 @@ class BaseInstaller(ABC):
return caps
for matcher in CAPABILITY_MATCHERS:
if matcher.applies_to_layer(self.layer) and matcher.is_provided(ctx, self.layer):
if matcher.applies_to_layer(self.layer) and matcher.is_provided(
ctx, self.layer
):
caps.add(matcher.name)
return caps

View File

@@ -16,7 +16,9 @@ class MakefileInstaller(BaseInstaller):
def supports(self, ctx: RepoContext) -> bool:
if os.environ.get("PKGMGR_DISABLE_MAKEFILE_INSTALLER") == "1":
if not ctx.quiet:
print("[INFO] PKGMGR_DISABLE_MAKEFILE_INSTALLER=1 skipping MakefileInstaller.")
print(
"[INFO] PKGMGR_DISABLE_MAKEFILE_INSTALLER=1 skipping MakefileInstaller."
)
return False
makefile_path = os.path.join(ctx.repo_dir, self.MAKEFILE_NAME)
@@ -46,7 +48,9 @@ class MakefileInstaller(BaseInstaller):
return
if not ctx.quiet:
print(f"[pkgmgr] Running make install for {ctx.identifier} (MakefileInstaller)")
print(
f"[pkgmgr] Running make install for {ctx.identifier} (MakefileInstaller)"
)
run_command("make install", cwd=ctx.repo_dir, preview=ctx.preview)

View File

@@ -57,7 +57,9 @@ class NixConflictResolver:
# 3) Fallback: output-name based lookup (also covers nix suggesting: `nix profile remove pkgmgr`)
if not tokens:
tokens = self._profile.find_remove_tokens_for_output(ctx, self._runner, output)
tokens = self._profile.find_remove_tokens_for_output(
ctx, self._runner, output
)
if tokens:
if not quiet:
@@ -94,7 +96,9 @@ class NixConflictResolver:
continue
if not quiet:
print("[nix] conflict detected but could not resolve profile entries to remove.")
print(
"[nix] conflict detected but could not resolve profile entries to remove."
)
return False
return False

View File

@@ -75,7 +75,9 @@ class NixFlakeInstaller(BaseInstaller):
# Core install path
# ---------------------------------------------------------------------
def _install_only(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
def _install_only(
self, ctx: "RepoContext", output: str, allow_failure: bool
) -> None:
install_cmd = f"nix profile install {self._installable(ctx, output)}"
if not ctx.quiet:
@@ -96,7 +98,9 @@ class NixFlakeInstaller(BaseInstaller):
output=output,
):
if not ctx.quiet:
print(f"[nix] output '{output}' successfully installed after conflict cleanup.")
print(
f"[nix] output '{output}' successfully installed after conflict cleanup."
)
return
if not ctx.quiet:
@@ -107,20 +111,26 @@ class NixFlakeInstaller(BaseInstaller):
# If indices are supported, try legacy index-upgrade path.
if self._indices_supported is not False:
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
indices = self._profile.find_installed_indices_for_output(
ctx, self._runner, output
)
upgraded = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
print(
f"[nix] output '{output}' successfully upgraded (index {idx})."
)
if upgraded:
return
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
print(
f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'."
)
for idx in indices:
self._remove_index(ctx, idx)
@@ -139,7 +149,9 @@ class NixFlakeInstaller(BaseInstaller):
print(f"[nix] output '{output}' successfully re-installed.")
return
print(f"[ERROR] Failed to install Nix flake output '{output}' (exit {final.returncode})")
print(
f"[ERROR] Failed to install Nix flake output '{output}' (exit {final.returncode})"
)
if not allow_failure:
raise SystemExit(final.returncode)
@@ -149,7 +161,9 @@ class NixFlakeInstaller(BaseInstaller):
# force_update path
# ---------------------------------------------------------------------
def _force_upgrade_output(self, ctx: "RepoContext", output: str, allow_failure: bool) -> None:
def _force_upgrade_output(
self, ctx: "RepoContext", output: str, allow_failure: bool
) -> None:
# Prefer token path if indices unsupported (new nix)
if self._indices_supported is False:
self._remove_tokens_for_output(ctx, output)
@@ -158,14 +172,18 @@ class NixFlakeInstaller(BaseInstaller):
print(f"[nix] output '{output}' successfully upgraded.")
return
indices = self._profile.find_installed_indices_for_output(ctx, self._runner, output)
indices = self._profile.find_installed_indices_for_output(
ctx, self._runner, output
)
upgraded_any = False
for idx in indices:
if self._upgrade_index(ctx, idx):
upgraded_any = True
if not ctx.quiet:
print(f"[nix] output '{output}' successfully upgraded (index {idx}).")
print(
f"[nix] output '{output}' successfully upgraded (index {idx})."
)
if upgraded_any:
if not ctx.quiet:
@@ -173,7 +191,9 @@ class NixFlakeInstaller(BaseInstaller):
return
if indices and not ctx.quiet:
print(f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'.")
print(
f"[nix] upgrade failed; removing indices {indices} and reinstalling '{output}'."
)
for idx in indices:
self._remove_index(ctx, idx)
@@ -223,7 +243,9 @@ class NixFlakeInstaller(BaseInstaller):
return
if not ctx.quiet:
print(f"[nix] indices unsupported; removing by token(s): {', '.join(tokens)}")
print(
f"[nix] indices unsupported; removing by token(s): {', '.join(tokens)}"
)
for t in tokens:
self._runner.run(ctx, f"nix profile remove {t}", allow_failure=True)

View File

@@ -101,7 +101,9 @@ class NixProfileInspector:
data = self.list_json(ctx, runner)
entries = normalize_elements(data)
tokens: List[str] = [out] # critical: matches nix's own suggestion for conflicts
tokens: List[str] = [
out
] # critical: matches nix's own suggestion for conflicts
for e in entries:
if entry_matches_output(e, out):

View File

@@ -48,7 +48,9 @@ class NixProfileListReader:
return uniq
def indices_matching_store_prefixes(self, ctx: "RepoContext", prefixes: List[str]) -> List[int]:
def indices_matching_store_prefixes(
self, ctx: "RepoContext", prefixes: List[str]
) -> List[int]:
prefixes = [self._store_prefix(p) for p in prefixes if p]
prefixes = [p for p in prefixes if p]
if not prefixes:

View File

@@ -11,6 +11,7 @@ if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
from .runner import CommandRunner
@dataclass(frozen=True)
class RetryPolicy:
max_attempts: int = 7
@@ -35,13 +36,19 @@ class GitHubRateLimitRetry:
install_cmd: str,
) -> RunResult:
quiet = bool(getattr(ctx, "quiet", False))
delays = list(self._fibonacci_backoff(self._policy.base_delay_seconds, self._policy.max_attempts))
delays = list(
self._fibonacci_backoff(
self._policy.base_delay_seconds, self._policy.max_attempts
)
)
last: RunResult | None = None
for attempt, base_delay in enumerate(delays, start=1):
if not quiet:
print(f"[nix] attempt {attempt}/{self._policy.max_attempts}: {install_cmd}")
print(
f"[nix] attempt {attempt}/{self._policy.max_attempts}: {install_cmd}"
)
res = runner.run(ctx, install_cmd, allow_failure=True)
last = res
@@ -56,7 +63,9 @@ class GitHubRateLimitRetry:
if attempt >= self._policy.max_attempts:
break
jitter = random.randint(self._policy.jitter_seconds_min, self._policy.jitter_seconds_max)
jitter = random.randint(
self._policy.jitter_seconds_min, self._policy.jitter_seconds_max
)
wait_time = base_delay + jitter
if not quiet:
@@ -67,7 +76,11 @@ class GitHubRateLimitRetry:
time.sleep(wait_time)
return last if last is not None else RunResult(returncode=1, stdout="", stderr="nix install retry failed")
return (
last
if last is not None
else RunResult(returncode=1, stdout="", stderr="nix install retry failed")
)
@staticmethod
def _is_github_rate_limit_error(text: str) -> bool:

View File

@@ -9,6 +9,7 @@ from .types import RunResult
if TYPE_CHECKING:
from pkgmgr.actions.install.context import RepoContext
class CommandRunner:
"""
Executes commands (shell=True) inside a repository directory (if provided).
@@ -40,7 +41,9 @@ class CommandRunner:
raise
return RunResult(returncode=1, stdout="", stderr=str(e))
res = RunResult(returncode=p.returncode, stdout=p.stdout or "", stderr=p.stderr or "")
res = RunResult(
returncode=p.returncode, stdout=p.stdout or "", stderr=p.stderr or ""
)
if res.returncode != 0 and not quiet:
self._print_compact_failure(res)

View File

@@ -20,7 +20,9 @@ class NixConflictTextParser:
tokens: List[str] = []
for m in pat.finditer(text or ""):
t = (m.group(1) or "").strip()
if (t.startswith("'") and t.endswith("'")) or (t.startswith('"') and t.endswith('"')):
if (t.startswith("'") and t.endswith("'")) or (
t.startswith('"') and t.endswith('"')
):
t = t[1:-1]
if t:
tokens.append(t)

View File

@@ -14,7 +14,9 @@ class PythonInstaller(BaseInstaller):
def supports(self, ctx: RepoContext) -> bool:
if os.environ.get("PKGMGR_DISABLE_PYTHON_INSTALLER") == "1":
print("[INFO] PythonInstaller disabled via PKGMGR_DISABLE_PYTHON_INSTALLER.")
print(
"[INFO] PythonInstaller disabled via PKGMGR_DISABLE_PYTHON_INSTALLER."
)
return False
return os.path.exists(os.path.join(ctx.repo_dir, "pyproject.toml"))

View File

@@ -132,7 +132,11 @@ class InstallationPipeline:
continue
if not quiet:
if ctx.force_update and state.layer is not None and installer_layer == state.layer:
if (
ctx.force_update
and state.layer is not None
and installer_layer == state.layer
):
print(
f"[pkgmgr] Running installer {installer.__class__.__name__} "
f"for {identifier} in '{repo_dir}' (upgrade requested)..."

View File

@@ -16,6 +16,7 @@ from .types import MirrorMap, Repository
# Helpers
# -----------------------------------------------------------------------------
def _repo_key(repo: Repository) -> Tuple[str, str, str]:
"""
Normalised key for identifying a repository in config files.
@@ -47,6 +48,7 @@ def _load_user_config(path: str) -> Dict[str, object]:
# Main merge command
# -----------------------------------------------------------------------------
def merge_mirrors(
selected_repos: List[Repository],
repositories_base_dir: str,

View File

@@ -66,7 +66,9 @@ def _setup_remote_mirrors_for_repo(
# Probe only git URLs (do not try ls-remote against PyPI etc.)
# If there are no mirrors at all, probe the primary git URL.
git_mirrors = {k: v for k, v in ctx.resolved_mirrors.items() if _is_git_remote_url(v)}
git_mirrors = {
k: v for k, v in ctx.resolved_mirrors.items() if _is_git_remote_url(v)
}
if not git_mirrors:
primary = determine_primary_remote_url(repo, ctx)

View File

@@ -17,7 +17,7 @@ def hostport_from_git_url(url: str) -> Tuple[str, Optional[str]]:
netloc = netloc.split("@", 1)[1]
if netloc.startswith("[") and "]" in netloc:
host = netloc[1:netloc.index("]")]
host = netloc[1 : netloc.index("]")]
rest = netloc[netloc.index("]") + 1 :]
port = rest[1:] if rest.startswith(":") else None
return host.strip(), (port.strip() if port else None)
@@ -43,7 +43,7 @@ def normalize_provider_host(host: str) -> str:
return ""
if host.startswith("[") and "]" in host:
host = host[1:host.index("]")]
host = host[1 : host.index("]")]
if ":" in host and host.count(":") == 1:
host = host.rsplit(":", 1)[0]

View File

@@ -4,7 +4,16 @@ from pkgmgr.core.repository.dir import get_repo_dir
from pkgmgr.core.command.run import run_command
import sys
def exec_proxy_command(proxy_prefix: str, selected_repos, repositories_base_dir, all_repos, proxy_command: str, extra_args, preview: bool):
def exec_proxy_command(
proxy_prefix: str,
selected_repos,
repositories_base_dir,
all_repos,
proxy_command: str,
extra_args,
preview: bool,
):
"""Execute a given proxy command with extra arguments for each repository."""
error_repos = []
max_exit_code = 0
@@ -22,7 +31,9 @@ def exec_proxy_command(proxy_prefix: str, selected_repos, repositories_base_dir,
try:
run_command(full_cmd, cwd=repo_dir, preview=preview)
except SystemExit as e:
print(f"[ERROR] Command failed in {repo_identifier} with exit code {e.code}.")
print(
f"[ERROR] Command failed in {repo_identifier} with exit code {e.code}."
)
error_repos.append((repo_identifier, e.code))
max_exit_code = max(max_exit_code, e.code)

View File

@@ -1,519 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
File and metadata update helpers for the release workflow.
Responsibilities:
- Update pyproject.toml with the new version.
- Update flake.nix, PKGBUILD, RPM spec files where present.
- Prepend release entries to CHANGELOG.md.
- Maintain distribution-specific changelog files:
* debian/changelog
* RPM spec %changelog section
including maintainer metadata where applicable.
"""
from __future__ import annotations
import os
import re
import subprocess
import sys
import tempfile
from datetime import date, datetime
from typing import Optional, Tuple
from pkgmgr.core.git.queries import get_config_value
# ---------------------------------------------------------------------------
# Editor helper for interactive changelog messages
# ---------------------------------------------------------------------------
def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
"""
Open $EDITOR (fallback 'nano') so the user can enter a changelog message.
The temporary file is pre-filled with commented instructions and an
optional initial_message. Lines starting with '#' are ignored when the
message is read back.
Returns the final message (may be empty string if user leaves it blank).
"""
editor = os.environ.get("EDITOR", "nano")
with tempfile.NamedTemporaryFile(
mode="w+",
delete=False,
encoding="utf-8",
) as tmp:
tmp_path = tmp.name
tmp.write(
"# Write the changelog entry for this release.\n"
"# Lines starting with '#' will be ignored.\n"
"# Empty result will fall back to a generic message.\n\n"
)
if initial_message:
tmp.write(initial_message.strip() + "\n")
tmp.flush()
try:
subprocess.call([editor, tmp_path])
except FileNotFoundError:
print(
f"[WARN] Editor {editor!r} not found; proceeding without "
"interactive changelog message."
)
try:
with open(tmp_path, "r", encoding="utf-8") as f:
content = f.read()
finally:
try:
os.remove(tmp_path)
except OSError:
pass
lines = [line for line in content.splitlines() if not line.strip().startswith("#")]
return "\n".join(lines).strip()
# ---------------------------------------------------------------------------
# File update helpers (pyproject + extra packaging + changelog)
# ---------------------------------------------------------------------------
def update_pyproject_version(
pyproject_path: str,
new_version: str,
preview: bool = False,
) -> None:
"""
Update the version in pyproject.toml with the new version.
The function looks for a line matching:
version = "X.Y.Z"
and replaces the version part with the given new_version string.
If the file does not exist, it is skipped without failing the release.
"""
if not os.path.exists(pyproject_path):
print(
f"[INFO] pyproject.toml not found at: {pyproject_path}, "
"skipping version update."
)
return
try:
with open(pyproject_path, "r", encoding="utf-8") as f:
content = f.read()
except OSError as exc:
print(
f"[WARN] Could not read pyproject.toml at {pyproject_path}: {exc}. "
"Skipping version update."
)
return
pattern = r'^(version\s*=\s*")([^"]+)(")'
new_content, count = re.subn(
pattern,
lambda m: f'{m.group(1)}{new_version}{m.group(3)}',
content,
flags=re.MULTILINE,
)
if count == 0:
print("[ERROR] Could not find version line in pyproject.toml")
sys.exit(1)
if preview:
print(f"[PREVIEW] Would update pyproject.toml version to {new_version}")
return
with open(pyproject_path, "w", encoding="utf-8") as f:
f.write(new_content)
print(f"Updated pyproject.toml version to {new_version}")
def update_flake_version(
flake_path: str,
new_version: str,
preview: bool = False,
) -> None:
"""
Update the version in flake.nix, if present.
"""
if not os.path.exists(flake_path):
print("[INFO] flake.nix not found, skipping.")
return
try:
with open(flake_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as exc:
print(f"[WARN] Could not read flake.nix: {exc}")
return
pattern = r'(version\s*=\s*")([^"]+)(")'
new_content, count = re.subn(
pattern,
lambda m: f'{m.group(1)}{new_version}{m.group(3)}',
content,
)
if count == 0:
print("[WARN] No version assignment found in flake.nix, skipping.")
return
if preview:
print(f"[PREVIEW] Would update flake.nix version to {new_version}")
return
with open(flake_path, "w", encoding="utf-8") as f:
f.write(new_content)
print(f"Updated flake.nix version to {new_version}")
def update_pkgbuild_version(
pkgbuild_path: str,
new_version: str,
preview: bool = False,
) -> None:
"""
Update the version in PKGBUILD, if present.
Expects:
pkgver=1.2.3
pkgrel=1
"""
if not os.path.exists(pkgbuild_path):
print("[INFO] PKGBUILD not found, skipping.")
return
try:
with open(pkgbuild_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as exc:
print(f"[WARN] Could not read PKGBUILD: {exc}")
return
ver_pattern = r"^(pkgver\s*=\s*)(.+)$"
new_content, ver_count = re.subn(
ver_pattern,
lambda m: f"{m.group(1)}{new_version}",
content,
flags=re.MULTILINE,
)
if ver_count == 0:
print("[WARN] No pkgver line found in PKGBUILD.")
new_content = content
rel_pattern = r"^(pkgrel\s*=\s*)(.+)$"
new_content, rel_count = re.subn(
rel_pattern,
lambda m: f"{m.group(1)}1",
new_content,
flags=re.MULTILINE,
)
if rel_count == 0:
print("[WARN] No pkgrel line found in PKGBUILD.")
if preview:
print(f"[PREVIEW] Would update PKGBUILD to pkgver={new_version}, pkgrel=1")
return
with open(pkgbuild_path, "w", encoding="utf-8") as f:
f.write(new_content)
print(f"Updated PKGBUILD to pkgver={new_version}, pkgrel=1")
def update_spec_version(
spec_path: str,
new_version: str,
preview: bool = False,
) -> None:
"""
Update the version in an RPM spec file, if present.
"""
if not os.path.exists(spec_path):
print("[INFO] RPM spec file not found, skipping.")
return
try:
with open(spec_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as exc:
print(f"[WARN] Could not read spec file: {exc}")
return
ver_pattern = r"^(Version:\s*)(.+)$"
new_content, ver_count = re.subn(
ver_pattern,
lambda m: f"{m.group(1)}{new_version}",
content,
flags=re.MULTILINE,
)
if ver_count == 0:
print("[WARN] No 'Version:' line found in spec file.")
rel_pattern = r"^(Release:\s*)(.+)$"
def _release_repl(m: re.Match[str]) -> str: # type: ignore[name-defined]
rest = m.group(2).strip()
match = re.match(r"^(\d+)(.*)$", rest)
if match:
suffix = match.group(2)
else:
suffix = ""
return f"{m.group(1)}1{suffix}"
new_content, rel_count = re.subn(
rel_pattern,
_release_repl,
new_content,
flags=re.MULTILINE,
)
if rel_count == 0:
print("[WARN] No 'Release:' line found in spec file.")
if preview:
print(
"[PREVIEW] Would update spec file "
f"{os.path.basename(spec_path)} to Version: {new_version}, Release: 1..."
)
return
with open(spec_path, "w", encoding="utf-8") as f:
f.write(new_content)
print(
f"Updated spec file {os.path.basename(spec_path)} "
f"to Version: {new_version}, Release: 1..."
)
def update_changelog(
changelog_path: str,
new_version: str,
message: Optional[str] = None,
preview: bool = False,
) -> str:
"""
Prepend a new release section to CHANGELOG.md with the new version,
current date, and a message.
"""
today = date.today().isoformat()
if message is None:
if preview:
message = "Automated release."
else:
print(
"\n[INFO] No release message provided, opening editor for "
"changelog entry...\n"
)
editor_message = _open_editor_for_changelog()
if not editor_message:
message = "Automated release."
else:
message = editor_message
header = f"## [{new_version}] - {today}\n"
header += f"\n* {message}\n\n"
if os.path.exists(changelog_path):
try:
with open(changelog_path, "r", encoding="utf-8") as f:
changelog = f.read()
except Exception as exc:
print(f"[WARN] Could not read existing CHANGELOG.md: {exc}")
changelog = ""
else:
changelog = ""
new_changelog = header + "\n" + changelog if changelog else header
print("\n================ CHANGELOG ENTRY ================")
print(header.rstrip())
print("=================================================\n")
if preview:
print(f"[PREVIEW] Would prepend new entry for {new_version} to CHANGELOG.md")
return message
with open(changelog_path, "w", encoding="utf-8") as f:
f.write(new_changelog)
print(f"Updated CHANGELOG.md with version {new_version}")
return message
# ---------------------------------------------------------------------------
# Debian changelog helpers (with Git config fallback for maintainer)
# ---------------------------------------------------------------------------
def _get_debian_author() -> Tuple[str, str]:
"""
Determine the maintainer name/email for debian/changelog entries.
"""
name = os.environ.get("DEBFULLNAME")
email = os.environ.get("DEBEMAIL")
if not name:
name = os.environ.get("GIT_AUTHOR_NAME")
if not email:
email = os.environ.get("GIT_AUTHOR_EMAIL")
if not name:
name = get_config_value("user.name")
if not email:
email = get_config_value("user.email")
if not name:
name = "Unknown Maintainer"
if not email:
email = "unknown@example.com"
return name, email
def update_debian_changelog(
debian_changelog_path: str,
package_name: str,
new_version: str,
message: Optional[str] = None,
preview: bool = False,
) -> None:
"""
Prepend a new entry to debian/changelog, if it exists.
"""
if not os.path.exists(debian_changelog_path):
print("[INFO] debian/changelog not found, skipping.")
return
debian_version = f"{new_version}-1"
now = datetime.now().astimezone()
date_str = now.strftime("%a, %d %b %Y %H:%M:%S %z")
author_name, author_email = _get_debian_author()
first_line = f"{package_name} ({debian_version}) unstable; urgency=medium"
body_line = message.strip() if message else f"Automated release {new_version}."
stanza = (
f"{first_line}\n\n"
f" * {body_line}\n\n"
f" -- {author_name} <{author_email}> {date_str}\n\n"
)
if preview:
print(
"[PREVIEW] Would prepend the following stanza to debian/changelog:\n"
f"{stanza}"
)
return
try:
with open(debian_changelog_path, "r", encoding="utf-8") as f:
existing = f.read()
except Exception as exc:
print(f"[WARN] Could not read debian/changelog: {exc}")
existing = ""
new_content = stanza + existing
with open(debian_changelog_path, "w", encoding="utf-8") as f:
f.write(new_content)
print(f"Updated debian/changelog with version {debian_version}")
# ---------------------------------------------------------------------------
# Fedora / RPM spec %changelog helper
# ---------------------------------------------------------------------------
def update_spec_changelog(
spec_path: str,
package_name: str,
new_version: str,
message: Optional[str] = None,
preview: bool = False,
) -> None:
"""
Prepend a new entry to the %changelog section of an RPM spec file,
if present.
Typical RPM-style entry:
* Tue Dec 09 2025 John Doe <john@example.com> - 0.5.1-1
- Your changelog message
"""
if not os.path.exists(spec_path):
print("[INFO] RPM spec file not found, skipping spec changelog update.")
return
try:
with open(spec_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as exc:
print(f"[WARN] Could not read spec file for changelog update: {exc}")
return
debian_version = f"{new_version}-1"
now = datetime.now().astimezone()
date_str = now.strftime("%a %b %d %Y")
# Reuse Debian maintainer discovery for author name/email.
author_name, author_email = _get_debian_author()
body_line = message.strip() if message else f"Automated release {new_version}."
stanza = (
f"* {date_str} {author_name} <{author_email}> - {debian_version}\n"
f"- {body_line}\n\n"
)
marker = "%changelog"
idx = content.find(marker)
if idx == -1:
# No %changelog section yet: append one at the end.
new_content = content.rstrip() + "\n\n%changelog\n" + stanza
else:
# Insert stanza right after the %changelog line.
before = content[: idx + len(marker)]
after = content[idx + len(marker) :]
new_content = before + "\n" + stanza + after.lstrip("\n")
if preview:
print(
"[PREVIEW] Would update RPM %changelog section with the following "
"stanza:\n"
f"{stanza}"
)
return
try:
with open(spec_path, "w", encoding="utf-8") as f:
f.write(new_content)
except Exception as exc:
print(f"[WARN] Failed to write updated spec changelog section: {exc}")
return
print(
f"Updated RPM %changelog section in {os.path.basename(spec_path)} "
f"for {package_name} {debian_version}"
)

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Backwards-compatible facade for the release file update helpers.
Implementations live in this package:
pkgmgr.actions.release.files.*
Keep this package stable so existing imports continue to work, e.g.:
from pkgmgr.actions.release.files import update_pyproject_version
"""
from __future__ import annotations
from .editor import _open_editor_for_changelog
from .pyproject import update_pyproject_version
from .flake import update_flake_version
from .pkgbuild import update_pkgbuild_version
from .rpm_spec import update_spec_version
from .changelog_md import update_changelog
from .debian import _get_debian_author, update_debian_changelog
from .rpm_changelog import update_spec_changelog
__all__ = [
"_open_editor_for_changelog",
"update_pyproject_version",
"update_flake_version",
"update_pkgbuild_version",
"update_spec_version",
"update_changelog",
"_get_debian_author",
"update_debian_changelog",
"update_spec_changelog",
]

View File

@@ -0,0 +1,62 @@
from __future__ import annotations
import os
from datetime import date
from typing import Optional
from .editor import _open_editor_for_changelog
def update_changelog(
changelog_path: str,
new_version: str,
message: Optional[str] = None,
preview: bool = False,
) -> str:
"""
Prepend a new release section to CHANGELOG.md with the new version,
current date, and a message.
"""
today = date.today().isoformat()
if message is None:
if preview:
message = "Automated release."
else:
print(
"\n[INFO] No release message provided, opening editor for changelog entry...\n"
)
editor_message = _open_editor_for_changelog()
if not editor_message:
message = "Automated release."
else:
message = editor_message
header = f"## [{new_version}] - {today}\n"
header += f"\n* {message}\n\n"
if os.path.exists(changelog_path):
try:
with open(changelog_path, "r", encoding="utf-8") as f:
changelog = f.read()
except Exception as exc:
print(f"[WARN] Could not read existing CHANGELOG.md: {exc}")
changelog = ""
else:
changelog = ""
new_changelog = header + "\n" + changelog if changelog else header
print("\n================ CHANGELOG ENTRY ================")
print(header.rstrip())
print("=================================================\n")
if preview:
print(f"[PREVIEW] Would prepend new entry for {new_version} to CHANGELOG.md")
return message
with open(changelog_path, "w", encoding="utf-8") as f:
f.write(new_changelog)
print(f"Updated CHANGELOG.md with version {new_version}")
return message

View File

@@ -0,0 +1,74 @@
from __future__ import annotations
import os
from datetime import datetime
from typing import Optional, Tuple
from pkgmgr.core.git.queries import get_config_value
def _get_debian_author() -> Tuple[str, str]:
name = os.environ.get("DEBFULLNAME")
email = os.environ.get("DEBEMAIL")
if not name:
name = os.environ.get("GIT_AUTHOR_NAME")
if not email:
email = os.environ.get("GIT_AUTHOR_EMAIL")
if not name:
name = get_config_value("user.name")
if not email:
email = get_config_value("user.email")
if not name:
name = "Unknown Maintainer"
if not email:
email = "unknown@example.com"
return name, email
def update_debian_changelog(
debian_changelog_path: str,
package_name: str,
new_version: str,
message: Optional[str] = None,
preview: bool = False,
) -> None:
if not os.path.exists(debian_changelog_path):
print("[INFO] debian/changelog not found, skipping.")
return
debian_version = f"{new_version}-1"
now = datetime.now().astimezone()
date_str = now.strftime("%a, %d %b %Y %H:%M:%S %z")
author_name, author_email = _get_debian_author()
first_line = f"{package_name} ({debian_version}) unstable; urgency=medium"
body_line = message.strip() if message else f"Automated release {new_version}."
stanza = (
f"{first_line}\n\n"
f" * {body_line}\n\n"
f" -- {author_name} <{author_email}> {date_str}\n\n"
)
if preview:
print(
"[PREVIEW] Would prepend the following stanza to debian/changelog:\n"
f"{stanza}"
)
return
try:
with open(debian_changelog_path, "r", encoding="utf-8") as f:
existing = f.read()
except Exception as exc:
print(f"[WARN] Could not read debian/changelog: {exc}")
existing = ""
with open(debian_changelog_path, "w", encoding="utf-8") as f:
f.write(stanza + existing)
print(f"Updated debian/changelog with version {debian_version}")

View File

@@ -0,0 +1,45 @@
from __future__ import annotations
import os
import subprocess
import tempfile
from typing import Optional
def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
editor = os.environ.get("EDITOR", "nano")
with tempfile.NamedTemporaryFile(
mode="w+",
delete=False,
encoding="utf-8",
) as tmp:
tmp_path = tmp.name
tmp.write(
"# Write the changelog entry for this release.\n"
"# Lines starting with '#' will be ignored.\n"
"# Empty result will fall back to a generic message.\n\n"
)
if initial_message:
tmp.write(initial_message.strip() + "\n")
tmp.flush()
try:
subprocess.call([editor, tmp_path])
except FileNotFoundError:
print(
f"[WARN] Editor {editor!r} not found; proceeding without "
"interactive changelog message."
)
try:
with open(tmp_path, "r", encoding="utf-8") as f:
content = f.read()
finally:
try:
os.remove(tmp_path)
except OSError:
pass
lines = [line for line in content.splitlines() if not line.strip().startswith("#")]
return "\n".join(lines).strip()

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
import os
import re
def update_flake_version(
flake_path: str, new_version: str, preview: bool = False
) -> None:
if not os.path.exists(flake_path):
print("[INFO] flake.nix not found, skipping.")
return
try:
with open(flake_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as exc:
print(f"[WARN] Could not read flake.nix: {exc}")
return
pattern = r'(version\s*=\s*")([^"]+)(")'
new_content, count = re.subn(
pattern,
lambda m: f"{m.group(1)}{new_version}{m.group(3)}",
content,
)
if count == 0:
print("[WARN] No version found in flake.nix.")
return
if preview:
print(f"[PREVIEW] Would update flake.nix version to {new_version}")
return
with open(flake_path, "w", encoding="utf-8") as f:
f.write(new_content)
print(f"Updated flake.nix version to {new_version}")

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
import os
import re
def update_pkgbuild_version(
pkgbuild_path: str, new_version: str, preview: bool = False
) -> None:
if not os.path.exists(pkgbuild_path):
print("[INFO] PKGBUILD not found, skipping.")
return
try:
with open(pkgbuild_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as exc:
print(f"[WARN] Could not read PKGBUILD: {exc}")
return
content, _ = re.subn(
r"^(pkgver\s*=\s*)(.+)$",
lambda m: f"{m.group(1)}{new_version}",
content,
flags=re.MULTILINE,
)
content, _ = re.subn(
r"^(pkgrel\s*=\s*)(.+)$",
lambda m: f"{m.group(1)}1",
content,
flags=re.MULTILINE,
)
if preview:
print(f"[PREVIEW] Would update PKGBUILD to pkgver={new_version}, pkgrel=1")
return
with open(pkgbuild_path, "w", encoding="utf-8") as f:
f.write(content)
print(f"Updated PKGBUILD to pkgver={new_version}, pkgrel=1")

View File

@@ -0,0 +1,45 @@
from __future__ import annotations
import os
import re
def update_pyproject_version(
pyproject_path: str, new_version: str, preview: bool = False
) -> None:
if not os.path.exists(pyproject_path):
print(f"[INFO] pyproject.toml not found at: {pyproject_path}, skipping.")
return
try:
with open(pyproject_path, "r", encoding="utf-8") as f:
content = f.read()
except OSError as exc:
print(f"[WARN] Could not read pyproject.toml: {exc}")
return
m = re.search(r"(?ms)^\s*\[project\]\s*$.*?(?=^\s*\[|\Z)", content)
if not m:
raise RuntimeError("Missing [project] section in pyproject.toml")
project_block = m.group(0)
ver_pat = r'(?m)^(\s*version\s*=\s*")([^"]+)(")\s*$'
new_block, count = re.subn(
ver_pat,
lambda mm: f"{mm.group(1)}{new_version}{mm.group(3)}",
project_block,
)
if count == 0:
raise RuntimeError("Missing version key in [project] section")
new_content = content[: m.start()] + new_block + content[m.end() :]
if preview:
print(f"[PREVIEW] Would update pyproject.toml version to {new_version}")
return
with open(pyproject_path, "w", encoding="utf-8") as f:
f.write(new_content)
print(f"Updated pyproject.toml version to {new_version}")

View File

@@ -0,0 +1,67 @@
from __future__ import annotations
import os
from datetime import datetime
from typing import Optional
from .debian import _get_debian_author
def update_spec_changelog(
spec_path: str,
package_name: str,
new_version: str,
message: Optional[str] = None,
preview: bool = False,
) -> None:
if not os.path.exists(spec_path):
print("[INFO] RPM spec file not found, skipping spec changelog update.")
return
try:
with open(spec_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as exc:
print(f"[WARN] Could not read spec file for changelog update: {exc}")
return
debian_version = f"{new_version}-1"
now = datetime.now().astimezone()
date_str = now.strftime("%a %b %d %Y")
author_name, author_email = _get_debian_author()
body_line = message.strip() if message else f"Automated release {new_version}."
stanza = (
f"* {date_str} {author_name} <{author_email}> - {debian_version}\n"
f"- {body_line}\n\n"
)
marker = "%changelog"
idx = content.find(marker)
if idx == -1:
new_content = content.rstrip() + "\n\n%changelog\n" + stanza
else:
before = content[: idx + len(marker)]
after = content[idx + len(marker) :]
new_content = before + "\n" + stanza + after.lstrip("\n")
if preview:
print(
"[PREVIEW] Would update RPM %changelog section with the following stanza:\n"
f"{stanza}"
)
return
try:
with open(spec_path, "w", encoding="utf-8") as f:
f.write(new_content)
except Exception as exc:
print(f"[WARN] Failed to write updated spec changelog section: {exc}")
return
print(
f"Updated RPM %changelog section in {os.path.basename(spec_path)} "
f"for {package_name} {debian_version}"
)

View File

@@ -0,0 +1,66 @@
from __future__ import annotations
import os
import re
def update_spec_version(
spec_path: str, new_version: str, preview: bool = False
) -> None:
"""
Update the version in an RPM spec file, if present.
"""
if not os.path.exists(spec_path):
print("[INFO] RPM spec file not found, skipping.")
return
try:
with open(spec_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as exc:
print(f"[WARN] Could not read spec file: {exc}")
return
ver_pattern = r"^(Version:\s*)(.+)$"
new_content, ver_count = re.subn(
ver_pattern,
lambda m: f"{m.group(1)}{new_version}",
content,
flags=re.MULTILINE,
)
if ver_count == 0:
print("[WARN] No 'Version:' line found in spec file.")
rel_pattern = r"^(Release:\s*)(.+)$"
def _release_repl(m: re.Match[str]) -> str:
rest = m.group(2).strip()
match = re.match(r"^(\d+)(.*)$", rest)
suffix = match.group(2) if match else ""
return f"{m.group(1)}1{suffix}"
new_content, rel_count = re.subn(
rel_pattern,
_release_repl,
new_content,
flags=re.MULTILINE,
)
if rel_count == 0:
print("[WARN] No 'Release:' line found in spec file.")
if preview:
print(
"[PREVIEW] Would update spec file "
f"{os.path.basename(spec_path)} to Version: {new_version}, Release: 1..."
)
return
with open(spec_path, "w", encoding="utf-8") as f:
f.write(new_content)
print(
f"Updated spec file {os.path.basename(spec_path)} "
f"to Version: {new_version}, Release: 1..."
)

View File

@@ -80,7 +80,9 @@ def is_highest_version_tag(tag: str) -> bool:
return True
latest = max(parsed_all)
print(f"[INFO] Latest tag (parsed): v{'.'.join(map(str, latest))}, Current tag: {tag}")
print(
f"[INFO] Latest tag (parsed): v{'.'.join(map(str, latest))}, Current tag: {tag}"
)
return parsed_current >= latest
@@ -93,7 +95,9 @@ def update_latest_tag(new_tag: str, *, preview: bool = False) -> None:
- 'latest' is forced (floating tag), therefore the push uses --force.
"""
target_ref = f"{new_tag}^{{}}"
print(f"[INFO] Updating 'latest' tag to point at {new_tag} (commit {target_ref})...")
print(
f"[INFO] Updating 'latest' tag to point at {new_tag} (commit {target_ref})..."
)
tag_force_annotated(
name="latest",

View File

@@ -76,7 +76,9 @@ def _release_impl(
if paths.arch_pkgbuild:
update_pkgbuild_version(paths.arch_pkgbuild, new_ver_str, preview=preview)
else:
print("[INFO] No PKGBUILD found (packaging/arch/PKGBUILD or PKGBUILD). Skipping.")
print(
"[INFO] No PKGBUILD found (packaging/arch/PKGBUILD or PKGBUILD). Skipping."
)
if paths.rpm_spec:
update_spec_version(paths.rpm_spec, new_ver_str, preview=preview)
@@ -123,7 +125,9 @@ def _release_impl(
paths.rpm_spec,
paths.debian_changelog,
]
existing_files = [p for p in files_to_add if isinstance(p, str) and p and os.path.exists(p)]
existing_files = [
p for p in files_to_add if isinstance(p, str) and p and os.path.exists(p)
]
if preview:
add(existing_files, preview=True)
@@ -135,13 +139,17 @@ def _release_impl(
if is_highest_version_tag(new_tag):
update_latest_tag(new_tag, preview=True)
else:
print(f"[PREVIEW] Skipping 'latest' update (tag {new_tag} is not the highest).")
print(
f"[PREVIEW] Skipping 'latest' update (tag {new_tag} is not the highest)."
)
if close and branch not in ("main", "master"):
if force:
print(f"[PREVIEW] Would delete branch {branch} (forced).")
else:
print(f"[PREVIEW] Would ask whether to delete branch {branch} after release.")
print(
f"[PREVIEW] Would ask whether to delete branch {branch} after release."
)
return
add(existing_files, preview=False)
@@ -157,7 +165,9 @@ def _release_impl(
if is_highest_version_tag(new_tag):
update_latest_tag(new_tag, preview=False)
else:
print(f"[INFO] Skipping 'latest' update (tag {new_tag} is not the highest).")
print(
f"[INFO] Skipping 'latest' update (tag {new_tag} is not the highest)."
)
except GitRunError as exc:
print(f"[WARN] Failed to update floating 'latest' tag for {new_tag}: {exc}")
print("'latest' tag was not updated.")
@@ -166,7 +176,9 @@ def _release_impl(
if close:
if branch in ("main", "master"):
print(f"[INFO] close=True but current branch is {branch}; skipping branch deletion.")
print(
f"[INFO] close=True but current branch is {branch}; skipping branch deletion."
)
return
if not should_delete_branch(force=force):

View File

@@ -55,7 +55,9 @@ def clone_repos(
clone_url = _build_clone_url(repo, clone_mode)
if not clone_url:
print(f"[WARNING] Cannot build clone URL for '{repo_identifier}'. Skipping.")
print(
f"[WARNING] Cannot build clone URL for '{repo_identifier}'. Skipping."
)
continue
shallow = clone_mode == "shallow"
@@ -84,7 +86,11 @@ def clone_repos(
continue
print(f"[WARNING] SSH clone failed for '{repo_identifier}': {exc}")
choice = input("Do you want to attempt HTTPS clone instead? (y/N): ").strip().lower()
choice = (
input("Do you want to attempt HTTPS clone instead? (y/N): ")
.strip()
.lower()
)
if choice != "y":
print(f"[INFO] HTTPS clone not attempted for '{repo_identifier}'.")
continue

View File

@@ -63,6 +63,4 @@ def _strip_git_suffix(name: str) -> str:
def _ensure_valid_repo_name(name: str) -> None:
if not _NAME_RE.fullmatch(name):
raise ValueError(
"Repository name must match: lowercase a-z, 0-9, '_' and '-'."
)
raise ValueError("Repository name must match: lowercase a-z, 0-9, '_' and '-'.")

View File

@@ -66,9 +66,7 @@ class TemplateRenderer:
for root, _, files in os.walk(self.templates_dir):
for fn in files:
if fn.endswith(".j2"):
rel = os.path.relpath(
os.path.join(root, fn), self.templates_dir
)
rel = os.path.relpath(os.path.join(root, fn), self.templates_dir)
print(f"[Preview] Would render template: {rel} -> {rel[:-3]}")
@staticmethod

View File

@@ -24,9 +24,13 @@ def deinstall_repos(
# Remove alias link/file (interactive)
if os.path.exists(alias_path):
confirm = input(
f"Are you sure you want to delete link '{alias_path}' for {repo_identifier}? [y/N]: "
).strip().lower()
confirm = (
input(
f"Are you sure you want to delete link '{alias_path}' for {repo_identifier}? [y/N]: "
)
.strip()
.lower()
)
if confirm == "y":
if preview:
print(f"[Preview] Would remove link '{alias_path}'.")

View File

@@ -3,19 +3,30 @@ import os
from pkgmgr.core.repository.identifier import get_repo_identifier
from pkgmgr.core.repository.dir import get_repo_dir
def delete_repos(selected_repos, repositories_base_dir, all_repos, preview=False):
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir, repo)
if os.path.exists(repo_dir):
confirm = input(f"Are you sure you want to delete directory '{repo_dir}' for {repo_identifier}? [y/N]: ").strip().lower()
confirm = (
input(
f"Are you sure you want to delete directory '{repo_dir}' for {repo_identifier}? [y/N]: "
)
.strip()
.lower()
)
if confirm == "y":
if preview:
print(f"[Preview] Would delete directory '{repo_dir}' for {repo_identifier}.")
print(
f"[Preview] Would delete directory '{repo_dir}' for {repo_identifier}."
)
else:
try:
shutil.rmtree(repo_dir)
print(f"Deleted repository directory '{repo_dir}' for {repo_identifier}.")
print(
f"Deleted repository directory '{repo_dir}' for {repo_identifier}."
)
except Exception as e:
print(f"Error deleting '{repo_dir}' for {repo_identifier}: {e}")
else:

View File

@@ -233,9 +233,7 @@ def list_repositories(
categories.append(str(repo["category"]))
yaml_tags: List[str] = list(map(str, repo.get("tags", [])))
display_tags: List[str] = sorted(
set(yaml_tags + list(map(str, extra_tags)))
)
display_tags: List[str] = sorted(set(yaml_tags + list(map(str, extra_tags))))
rows.append(
{
@@ -288,13 +286,7 @@ def list_repositories(
status_padded = status.ljust(status_width)
status_colored = _color_status(status_padded)
print(
f"{ident_col} "
f"{status_colored} "
f"{cat_col} "
f"{tag_col} "
f"{dir_col}"
)
print(f"{ident_col} {status_colored} {cat_col} {tag_col} {dir_col}")
# ------------------------------------------------------------------
# Detailed section (alias value red, same status coloring)

View File

@@ -55,12 +55,16 @@ class UpdateManager:
code = exc.code if isinstance(exc.code, int) else str(exc.code)
failures.append((identifier, f"pull failed (exit={code})"))
if not quiet:
print(f"[Warning] update: pull failed for {identifier} (exit={code}). Continuing...")
print(
f"[Warning] update: pull failed for {identifier} (exit={code}). Continuing..."
)
continue
except Exception as exc:
failures.append((identifier, f"pull failed: {exc}"))
if not quiet:
print(f"[Warning] update: pull failed for {identifier}: {exc}. Continuing...")
print(
f"[Warning] update: pull failed for {identifier}: {exc}. Continuing..."
)
continue
try:
@@ -82,12 +86,16 @@ class UpdateManager:
code = exc.code if isinstance(exc.code, int) else str(exc.code)
failures.append((identifier, f"install failed (exit={code})"))
if not quiet:
print(f"[Warning] update: install failed for {identifier} (exit={code}). Continuing...")
print(
f"[Warning] update: install failed for {identifier} (exit={code}). Continuing..."
)
continue
except Exception as exc:
failures.append((identifier, f"install failed: {exc}"))
if not quiet:
print(f"[Warning] update: install failed for {identifier}: {exc}. Continuing...")
print(
f"[Warning] update: install failed for {identifier}: {exc}. Continuing..."
)
continue
if failures and not quiet:

View File

@@ -31,6 +31,7 @@ class OSReleaseInfo:
"""
Minimal /etc/os-release representation for distro detection.
"""
id: str = ""
id_like: str = ""
pretty_name: str = ""
@@ -63,4 +64,6 @@ class OSReleaseInfo:
def is_fedora_family(self) -> bool:
ids = self.ids()
return bool(ids.intersection({"fedora", "rhel", "centos", "rocky", "almalinux"}))
return bool(
ids.intersection({"fedora", "rhel", "centos", "rocky", "almalinux"})
)

View File

@@ -58,7 +58,9 @@ class SystemUpdater:
run_command("sudo pacman -Syu --noconfirm", preview=preview)
return
print("[Warning] Cannot update Arch system: missing required tools (sudo/yay/pacman).")
print(
"[Warning] Cannot update Arch system: missing required tools (sudo/yay/pacman)."
)
def _update_debian(self, *, preview: bool) -> None:
from pkgmgr.core.command.run import run_command
@@ -67,7 +69,9 @@ class SystemUpdater:
apt_get = shutil.which("apt-get")
if not (sudo and apt_get):
print("[Warning] Cannot update Debian/Ubuntu system: missing required tools (sudo/apt-get).")
print(
"[Warning] Cannot update Debian/Ubuntu system: missing required tools (sudo/apt-get)."
)
return
env = "DEBIAN_FRONTEND=noninteractive"

View File

@@ -29,6 +29,7 @@ For details on any command, run:
\033[1mpkgmgr <command> --help\033[0m
"""
def main() -> None:
"""
Entry point for the pkgmgr CLI.
@@ -41,9 +42,7 @@ def main() -> None:
repositories_dir = os.path.expanduser(
directories.get("repositories", "~/Repositories")
)
binaries_dir = os.path.expanduser(
directories.get("binaries", "~/.local/bin")
)
binaries_dir = os.path.expanduser(directories.get("binaries", "~/.local/bin"))
# Ensure the merged config actually contains the resolved directories
config_merged.setdefault("directories", {})

View File

@@ -135,9 +135,7 @@ def handle_changelog(
target_tag=range_arg,
)
if cur_tag is None:
print(
f"[WARN] Tag {range_arg!r} not found or not a SemVer tag."
)
print(f"[WARN] Tag {range_arg!r} not found or not a SemVer tag.")
print("[INFO] Falling back to full history.")
from_ref = None
to_ref = None

View File

@@ -213,9 +213,7 @@ def handle_config(args, ctx: CLIContext) -> None:
)
if key == mod_key:
entry["ignore"] = args.set == "true"
print(
f"Set ignore for {key} to {entry['ignore']}"
)
print(f"Set ignore for {key} to {entry['ignore']}")
save_user_config(user_config, user_config_path)
return

View File

@@ -4,7 +4,12 @@ from __future__ import annotations
import sys
from typing import Any, Dict, List
from pkgmgr.actions.mirror import diff_mirrors, list_mirrors, merge_mirrors, setup_mirrors
from pkgmgr.actions.mirror import (
diff_mirrors,
list_mirrors,
merge_mirrors,
setup_mirrors,
)
from pkgmgr.cli.context import CLIContext
Repository = Dict[str, Any]
@@ -56,11 +61,15 @@ def handle_mirror_command(
preview = getattr(args, "preview", False)
if source == target:
print("[ERROR] For 'mirror merge', source and target must differ (config vs file).")
print(
"[ERROR] For 'mirror merge', source and target must differ (config vs file)."
)
sys.exit(2)
explicit_config_path = getattr(args, "config_path", None)
user_config_path = explicit_config_path or getattr(ctx, "user_config_path", None)
user_config_path = explicit_config_path or getattr(
ctx, "user_config_path", None
)
merge_mirrors(
selected_repos=selected,

View File

@@ -18,7 +18,9 @@ def handle_publish(args, ctx: CLIContext, selected: List[Repository]) -> None:
for repo in selected:
identifier = get_repo_identifier(repo, ctx.all_repositories)
repo_dir = repo.get("directory") or get_repo_dir(ctx.repositories_base_dir, repo)
repo_dir = repo.get("directory") or get_repo_dir(
ctx.repositories_base_dir, repo
)
if not os.path.isdir(repo_dir):
print(f"[WARN] Skipping {identifier}: directory missing.")

View File

@@ -36,9 +36,13 @@ def handle_release(
identifier = get_repo_identifier(repo, ctx.all_repositories)
try:
repo_dir = repo.get("directory") or get_repo_dir(ctx.repositories_base_dir, repo)
repo_dir = repo.get("directory") or get_repo_dir(
ctx.repositories_base_dir, repo
)
except Exception as exc:
print(f"[WARN] Skipping repository {identifier}: failed to resolve directory: {exc}")
print(
f"[WARN] Skipping repository {identifier}: failed to resolve directory: {exc}"
)
continue
if not os.path.isdir(repo_dir):

View File

@@ -32,9 +32,8 @@ def _resolve_repository_directory(repository: Repository, ctx: CLIContext) -> st
if repo_dir:
return repo_dir
base_dir = (
getattr(ctx, "repositories_base_dir", None)
or getattr(ctx, "repositories_dir", None)
base_dir = getattr(ctx, "repositories_base_dir", None) or getattr(
ctx, "repositories_dir", None
)
if not base_dir:
raise RuntimeError(

View File

@@ -38,9 +38,9 @@ def _print_pkgmgr_self_version() -> None:
# Common distribution/module naming variants.
python_candidates = [
"package-manager", # PyPI dist name in your project
"package_manager", # module-ish variant
"pkgmgr", # console/alias-ish
"package-manager", # PyPI dist name in your project
"package_manager", # module-ish variant
"pkgmgr", # console/alias-ish
]
nix_candidates = [
"pkgmgr",

View File

@@ -33,8 +33,7 @@ def add_branch_subparsers(
"name",
nargs="?",
help=(
"Name of the new branch (optional; will be asked interactively "
"if omitted)"
"Name of the new branch (optional; will be asked interactively if omitted)"
),
)
branch_open.add_argument(
@@ -54,8 +53,7 @@ def add_branch_subparsers(
"name",
nargs="?",
help=(
"Name of the branch to close (optional; current branch is used "
"if omitted)"
"Name of the branch to close (optional; current branch is used if omitted)"
),
)
branch_close.add_argument(
@@ -84,8 +82,7 @@ def add_branch_subparsers(
"name",
nargs="?",
help=(
"Name of the branch to drop (optional; current branch is used "
"if omitted)"
"Name of the branch to drop (optional; current branch is used if omitted)"
),
)
branch_drop.add_argument(

View File

@@ -20,7 +20,9 @@ def add_mirror_subparsers(subparsers: argparse._SubParsersAction) -> None:
required=True,
)
mirror_list = mirror_subparsers.add_parser("list", help="List configured mirrors for repositories")
mirror_list = mirror_subparsers.add_parser(
"list", help="List configured mirrors for repositories"
)
add_identifier_arguments(mirror_list)
mirror_list.add_argument(
"--source",
@@ -29,15 +31,21 @@ def add_mirror_subparsers(subparsers: argparse._SubParsersAction) -> None:
help="Which mirror source to show.",
)
mirror_diff = mirror_subparsers.add_parser("diff", help="Show differences between config mirrors and MIRRORS file")
mirror_diff = mirror_subparsers.add_parser(
"diff", help="Show differences between config mirrors and MIRRORS file"
)
add_identifier_arguments(mirror_diff)
mirror_merge = mirror_subparsers.add_parser(
"merge",
help="Merge mirrors between config and MIRRORS file (example: pkgmgr mirror merge config file --all)",
)
mirror_merge.add_argument("source", choices=["config", "file"], help="Source of mirrors.")
mirror_merge.add_argument("target", choices=["config", "file"], help="Target of mirrors.")
mirror_merge.add_argument(
"source", choices=["config", "file"], help="Source of mirrors."
)
mirror_merge.add_argument(
"target", choices=["config", "file"], help="Target of mirrors."
)
add_identifier_arguments(mirror_merge)
mirror_merge.add_argument(
"--config-path",

View File

@@ -48,9 +48,6 @@ def add_navigation_subparsers(
"--command",
nargs=argparse.REMAINDER,
dest="shell_command",
help=(
"The shell command (and its arguments) to execute in each "
"repository"
),
help=("The shell command (and its arguments) to execute in each repository"),
default=[],
)

View File

@@ -53,10 +53,7 @@ def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"identifiers",
nargs="*",
help=(
"Identifier(s) for repositories. "
"Default: Repository of current folder."
),
help=("Identifier(s) for repositories. Default: Repository of current folder."),
)
parser.add_argument(
"--all",
@@ -118,12 +115,7 @@ def _proxy_has_explicit_selection(args: argparse.Namespace) -> bool:
string_filter = getattr(args, "string", "") or ""
# Proxy commands currently do not support --tag, so it is not checked here.
return bool(
use_all
or identifiers
or categories
or string_filter
)
return bool(use_all or identifiers or categories or string_filter)
def _select_repo_for_current_directory(
@@ -204,9 +196,7 @@ def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool:
If the top-level command is one of the proxy subcommands
(git / docker / docker compose), handle it here and return True.
"""
all_proxy_subcommands = {
sub for subs in PROXY_COMMANDS.values() for sub in subs
}
all_proxy_subcommands = {sub for subs in PROXY_COMMANDS.values() for sub in subs}
if args.command not in all_proxy_subcommands:
return False

View File

@@ -22,9 +22,8 @@ def resolve_repository_path(repository: Repository, ctx: CLIContext) -> str:
if value:
return value
base_dir = (
getattr(ctx, "repositories_base_dir", None)
or getattr(ctx, "repositories_dir", None)
base_dir = getattr(ctx, "repositories_base_dir", None) or getattr(
ctx, "repositories_dir", None
)
if not base_dir:
raise RuntimeError(

View File

@@ -57,7 +57,9 @@ def _build_workspace_filename(identifiers: List[str]) -> str:
return "_".join(sorted_identifiers) + ".code-workspace"
def _build_workspace_data(selected: List[Repository], ctx: CLIContext) -> Dict[str, Any]:
def _build_workspace_data(
selected: List[Repository], ctx: CLIContext
) -> Dict[str, Any]:
folders = [{"path": resolve_repository_path(repo, ctx)} for repo in selected]
return {
"folders": folders,

View File

@@ -2,6 +2,7 @@ import os
import hashlib
import re
def generate_alias(repo, bin_dir, existing_aliases):
"""
Generate an alias for a repository based on its repository name.

View File

@@ -98,8 +98,7 @@ def create_ink(
if alias_name == repo_identifier:
if not quiet:
print(
f"Alias '{alias_name}' equals identifier. "
"Skipping alias creation."
f"Alias '{alias_name}' equals identifier. Skipping alias creation."
)
return

View File

@@ -8,6 +8,7 @@ class CliLayer(str, Enum):
"""
CLI layer precedence (lower number = stronger layer).
"""
OS_PACKAGES = "os-packages"
NIX = "nix"
PYTHON = "python"

View File

@@ -34,11 +34,7 @@ def _nix_binary_candidates(home: str, names: List[str]) -> List[str]:
"""
Build possible Nix profile binary paths for a list of candidate names.
"""
return [
os.path.join(home, ".nix-profile", "bin", name)
for name in names
if name
]
return [os.path.join(home, ".nix-profile", "bin", name) for name in names if name]
def _path_binary_candidates(names: List[str]) -> List[str]:
@@ -148,7 +144,8 @@ def resolve_command_for_repo(
# c) Nix profile binaries
nix_binaries = [
path for path in _nix_binary_candidates(home, candidate_names)
path
for path in _nix_binary_candidates(home, candidate_names)
if _is_executable(path)
]
nix_binary = nix_binaries[0] if nix_binaries else None

View File

@@ -51,6 +51,7 @@ Repo = Dict[str, Any]
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]:
"""
Recursively merge two dictionaries.
@@ -58,11 +59,7 @@ def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any
Values from `override` win over values in `base`.
"""
for key, value in override.items():
if (
key in base
and isinstance(base[key], dict)
and isinstance(value, dict)
):
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
_deep_merge(base[key], value)
else:
base[key] = value
@@ -93,9 +90,7 @@ def _merge_repo_lists(
- Wenn category_name gesetzt ist, wird dieser in
repo["category_files"] eingetragen.
"""
index: Dict[Tuple[str, str, str], Repo] = {
_repo_key(r): r for r in base_list
}
index: Dict[Tuple[str, str, str], Repo] = {_repo_key(r): r for r in base_list}
for src in new_list:
key = _repo_key(src)
@@ -233,10 +228,12 @@ def _load_defaults_from_package_or_project() -> Dict[str, Any]:
return {"directories": {}, "repositories": []}
# ---------------------------------------------------------------------------
# Hauptfunktion
# ---------------------------------------------------------------------------
def load_config(user_config_path: str) -> Dict[str, Any]:
"""
Load and merge configuration for pkgmgr.
@@ -289,8 +286,12 @@ def load_config(user_config_path: str) -> Dict[str, Any]:
# repositories
merged["repositories"] = []
_merge_repo_lists(merged["repositories"], defaults["repositories"], category_name=None)
_merge_repo_lists(merged["repositories"], user_cfg["repositories"], category_name=None)
_merge_repo_lists(
merged["repositories"], defaults["repositories"], category_name=None
)
_merge_repo_lists(
merged["repositories"], user_cfg["repositories"], category_name=None
)
# andere Top-Level-Keys (falls vorhanden)
other_keys = (set(defaults.keys()) | set(user_cfg.keys())) - {

View File

@@ -1,9 +1,10 @@
import yaml
import os
def save_user_config(user_config,USER_CONFIG_PATH:str):
def save_user_config(user_config, USER_CONFIG_PATH: str):
"""Save the user configuration to USER_CONFIG_PATH."""
os.makedirs(os.path.dirname(USER_CONFIG_PATH), exist_ok=True)
with open(USER_CONFIG_PATH, 'w') as f:
with open(USER_CONFIG_PATH, "w") as f:
yaml.dump(user_config, f)
print(f"User configuration updated in {USER_CONFIG_PATH}.")

View File

@@ -16,7 +16,9 @@ class EnvTokenProvider:
source_name: str = "env"
def get(self, request: TokenRequest) -> Optional[TokenResult]:
for key in env_var_candidates(request.provider_kind, request.host, request.owner):
for key in env_var_candidates(
request.provider_kind, request.host, request.owner
):
val = os.environ.get(key)
if val:
return TokenResult(token=val.strip(), source=self.source_name)

View File

@@ -15,6 +15,7 @@ class GhTokenProvider:
This does NOT persist anything; it only reads what `gh` already knows.
"""
source_name: str = "gh"
def get(self, request: TokenRequest) -> Optional[TokenResult]:

View File

@@ -21,9 +21,7 @@ def _import_keyring():
try:
import keyring # type: ignore
except Exception as exc: # noqa: BLE001
raise KeyringUnavailableError(
"python-keyring is not installed."
) from exc
raise KeyringUnavailableError("python-keyring is not installed.") from exc
# Some environments have keyring installed but no usable backend.
# We do a lightweight "backend sanity check" by attempting to read the backend.

View File

@@ -9,7 +9,12 @@ from .providers.env import EnvTokenProvider
from .providers.gh import GhTokenProvider
from .providers.keyring import KeyringTokenProvider
from .providers.prompt import PromptTokenProvider
from .types import KeyringUnavailableError, NoCredentialsError, TokenRequest, TokenResult
from .types import (
KeyringUnavailableError,
NoCredentialsError,
TokenRequest,
TokenResult,
)
from .validate import validate_token
@@ -55,7 +60,10 @@ class TokenResolver:
print(f" {msg}", file=sys.stderr)
print(" Tokens will NOT be persisted securely.", file=sys.stderr)
print("", file=sys.stderr)
print(" To enable secure token storage, install python-keyring:", file=sys.stderr)
print(
" To enable secure token storage, install python-keyring:",
file=sys.stderr,
)
print(" pip install keyring", file=sys.stderr)
print("", file=sys.stderr)
print(" Or install via system packages:", file=sys.stderr)

View File

@@ -13,7 +13,9 @@ class KeyringKey:
username: str
def build_keyring_key(provider_kind: str, host: str, owner: Optional[str]) -> KeyringKey:
def build_keyring_key(
provider_kind: str, host: str, owner: Optional[str]
) -> KeyringKey:
"""Build a stable keyring key.
- service: "pkgmgr:<provider>"
@@ -21,11 +23,15 @@ def build_keyring_key(provider_kind: str, host: str, owner: Optional[str]) -> Ke
"""
provider_kind = str(provider_kind).strip().lower()
host = str(host).strip()
owner_part = (str(owner).strip() if owner else "-")
return KeyringKey(service=f"pkgmgr:{provider_kind}", username=f"{host}|{owner_part}")
owner_part = str(owner).strip() if owner else "-"
return KeyringKey(
service=f"pkgmgr:{provider_kind}", username=f"{host}|{owner_part}"
)
def env_var_candidates(provider_kind: str, host: str, owner: Optional[str]) -> list[str]:
def env_var_candidates(
provider_kind: str, host: str, owner: Optional[str]
) -> list[str]:
"""Return a list of environment variable names to try.
Order is from most specific to most generic.

View File

@@ -18,4 +18,6 @@ def add_all(*, cwd: str = ".", preview: bool = False) -> None:
try:
run(["add", "-A"], cwd=cwd, preview=preview)
except GitRunError as exc:
raise GitAddAllError("Failed to stage all changes with `git add -A`.", cwd=cwd) from exc
raise GitAddAllError(
"Failed to stage all changes with `git add -A`.", cwd=cwd
) from exc

View File

@@ -18,4 +18,6 @@ def branch_move(branch: str, *, cwd: str = ".", preview: bool = False) -> None:
try:
run(["branch", "-M", branch], cwd=cwd, preview=preview)
except GitRunError as exc:
raise GitBranchMoveError(f"Failed to move/rename current branch to {branch!r}.", cwd=cwd) from exc
raise GitBranchMoveError(
f"Failed to move/rename current branch to {branch!r}.", cwd=cwd
) from exc

View File

@@ -4,21 +4,26 @@ from __future__ import annotations
class GitBaseError(RuntimeError):
"""Base error raised for Git related failures."""
class GitRunError(GitBaseError):
"""Base error raised for Git related failures."""
class GitNotRepositoryError(GitBaseError):
"""Raised when the current working directory is not a git repository."""
class GitQueryError(GitRunError):
"""Base class for read-only git query failures."""
class GitCommandError(GitRunError):
"""
Base class for state-changing git command failures.
Use subclasses to provide stable error types for callers.
"""
def __init__(self, message: str, *, cwd: str = ".") -> None:
super().__init__(message)
if cwd in locals():

View File

@@ -16,6 +16,7 @@ def _is_missing_key_error(exc: GitRunError) -> bool:
# 'git config --get' returns exit code 1 when the key is not set.
return "exit code: 1" in msg
def get_config_value(key: str, *, cwd: str = ".") -> Optional[str]:
"""
Return a value from `git config --get <key>`, or None if not set.

View File

@@ -30,4 +30,4 @@ def get_remote_head_commit(
) from exc
# minimal parsing: first token is the hash
return (out.split()[0].strip() if out else "")
return out.split()[0].strip() if out else ""

View File

@@ -4,6 +4,7 @@ from typing import Set
from ..run import run
def get_remote_push_urls(remote: str, cwd: str = ".") -> Set[str]:
"""
Return all push URLs configured for a remote.

View File

@@ -44,9 +44,7 @@ def run(
stderr = exc.stderr or ""
if _is_not_repo_error(stderr):
raise GitNotRepositoryError(
f"Not a git repository: {cwd!r}\n"
f"Command: {cmd_str}\n"
f"STDERR:\n{stderr}"
f"Not a git repository: {cwd!r}\nCommand: {cmd_str}\nSTDERR:\n{stderr}"
) from exc
raise GitRunError(

View File

@@ -34,7 +34,15 @@ def get_repo_dir(repositories_base_dir: str, repo: Dict[str, Any]) -> str:
account = repo.get("account")
repository = repo.get("repository")
missing = [k for k, v in [("provider", provider), ("account", account), ("repository", repository)] if not v]
missing = [
k
for k, v in [
("provider", provider),
("account", account),
("repository", repository),
]
if not v
]
if missing:
print(
"Error: repository entry is missing required keys.\n"

View File

@@ -9,4 +9,4 @@ def get_repo_identifier(repo, all_repos):
if count == 1:
return repo_name
else:
return f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
return f"{repo.get('provider')}/{repo.get('account')}/{repo.get('repository')}"

View File

@@ -109,7 +109,9 @@ def resolve_repo_paths(repo_dir: str) -> RepoPaths:
]
)
if rpm_spec is None:
rpm_spec = _find_first_spec_in_dir(os.path.join(repo_dir, "packaging", "fedora"))
rpm_spec = _find_first_spec_in_dir(
os.path.join(repo_dir, "packaging", "fedora")
)
if rpm_spec is None:
rpm_spec = _find_first_spec_in_dir(repo_dir)

View File

@@ -1,5 +1,4 @@
def resolve_repos(identifiers:[], all_repos:[]):
def resolve_repos(identifiers: [], all_repos: []):
"""
Given a list of identifier strings, return a list of repository configs.
The identifier can be:
@@ -11,7 +10,9 @@ def resolve_repos(identifiers:[], all_repos:[]):
for ident in identifiers:
matches = []
for repo in all_repos:
full_id = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
full_id = (
f"{repo.get('provider')}/{repo.get('account')}/{repo.get('repository')}"
)
if ident == full_id:
matches.append(repo)
elif ident == repo.get("alias"):

View File

@@ -66,18 +66,26 @@ def verify_repository(repo, repo_dir, mode="local", no_verification=False):
if expected_commit:
if not commit_hash:
commit_check_passed = False
error_details.append(f"Expected commit: {expected_commit}, but could not determine current commit.")
error_details.append(
f"Expected commit: {expected_commit}, but could not determine current commit."
)
elif commit_hash != expected_commit:
commit_check_passed = False
error_details.append(f"Expected commit: {expected_commit}, found: {commit_hash}")
error_details.append(
f"Expected commit: {expected_commit}, found: {commit_hash}"
)
if expected_gpg_keys:
if not signing_key:
gpg_check_passed = False
error_details.append(f"Expected one of GPG keys: {expected_gpg_keys}, but no signing key was found.")
error_details.append(
f"Expected one of GPG keys: {expected_gpg_keys}, but no signing key was found."
)
elif signing_key not in expected_gpg_keys:
gpg_check_passed = False
error_details.append(f"Expected one of GPG keys: {expected_gpg_keys}, found: {signing_key}")
error_details.append(
f"Expected one of GPG keys: {expected_gpg_keys}, found: {signing_key}"
)
if expected_commit and expected_gpg_keys:
verified_ok = commit_check_passed and gpg_check_passed

Some files were not shown because too many files have changed in this diff Show More