Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9357c4632e | ||
|
|
ca5d0d22f3 | ||
|
|
3875338fb7 | ||
|
|
196f55c58e | ||
|
|
9a149715f6 | ||
|
|
bf40533469 | ||
|
|
7bc7259988 | ||
|
|
66b96ac3a5 | ||
|
|
f974e0b14a | ||
|
|
de8c3f768d | ||
|
|
05ff250251 | ||
|
|
ab52d37467 | ||
|
|
80329b85fb |
2
.github/workflows/test-container.yml
vendored
2
.github/workflows/test-container.yml
vendored
@@ -1,4 +1,4 @@
|
||||
name: Test Distribution Containers
|
||||
name: Test OS Containers
|
||||
|
||||
on:
|
||||
push:
|
||||
|
||||
2
.github/workflows/test-e2e.yml
vendored
2
.github/workflows/test-e2e.yml
vendored
@@ -1,4 +1,4 @@
|
||||
name: Test package-manager (e2e)
|
||||
name: Test End-To-End
|
||||
|
||||
on:
|
||||
push:
|
||||
|
||||
8
.github/workflows/test-integration.yml
vendored
8
.github/workflows/test-integration.yml
vendored
@@ -1,4 +1,4 @@
|
||||
name: Test package-manager (integration)
|
||||
name: Test Code Integration
|
||||
|
||||
on:
|
||||
push:
|
||||
@@ -21,9 +21,5 @@ jobs:
|
||||
- name: Show Docker version
|
||||
run: docker version
|
||||
|
||||
# Build Arch test image (same as used in test-unit and test-e2e)
|
||||
- name: Build test images
|
||||
run: make build
|
||||
|
||||
- name: Run integration tests via make (Arch container)
|
||||
run: make test-integration
|
||||
run: make test-integration DISTROS="arch"
|
||||
|
||||
4
.github/workflows/test-unit.yml
vendored
4
.github/workflows/test-unit.yml
vendored
@@ -1,4 +1,4 @@
|
||||
name: Test package-manager (unit)
|
||||
name: Test Units
|
||||
|
||||
on:
|
||||
push:
|
||||
@@ -22,4 +22,4 @@ jobs:
|
||||
run: docker version
|
||||
|
||||
- name: Run unit tests via make (Arch container)
|
||||
run: make test-unit
|
||||
run: make test-unit DISTROS="arch"
|
||||
|
||||
35
CHANGELOG.md
35
CHANGELOG.md
@@ -1,3 +1,38 @@
|
||||
## [0.7.7] - 2025-12-09
|
||||
|
||||
* Added TEST_PATTERN parameter to execute dedicated tests
|
||||
|
||||
|
||||
## [0.7.6] - 2025-12-09
|
||||
|
||||
* Fixed pull --preview bug in e2e test
|
||||
|
||||
|
||||
## [0.7.5] - 2025-12-09
|
||||
|
||||
* Fixed wrong directory permissions for nix
|
||||
|
||||
|
||||
## [0.7.4] - 2025-12-09
|
||||
|
||||
* Fixed missing build in test workflow -> Tests pass now
|
||||
|
||||
|
||||
## [0.7.3] - 2025-12-09
|
||||
|
||||
* Fixed bug: Ignored packages are now ignored
|
||||
|
||||
|
||||
## [0.7.2] - 2025-12-09
|
||||
|
||||
* Implemented Changelog Support for Fedora and Debian
|
||||
|
||||
|
||||
## [0.7.1] - 2025-12-09
|
||||
|
||||
* Fix floating 'latest' tag logic: dereference annotated target (vX.Y.Z^{}), add tag message to avoid Git errors, ensure best-effort update without blocking releases, and update unit tests (see ChatGPT conversation: https://chatgpt.com/share/69383024-efa4-800f-a875-129b81fa40ff).
|
||||
|
||||
|
||||
## [0.7.0] - 2025-12-09
|
||||
|
||||
* Add Git helpers for branch sync and floating 'latest' tag in the release workflow, ensure main/master are updated from origin before tagging, and extend unit/e2e tests including 'pkgmgr release --help' coverage (see ChatGPT conversation: https://chatgpt.com/share/69383024-efa4-800f-a875-129b81fa40ff)
|
||||
|
||||
16
Makefile
16
Makefile
@@ -12,7 +12,7 @@ NIX_CACHE_VOLUME := pkgmgr_nix_cache
|
||||
# Distro list and base images
|
||||
# (kept for documentation/reference; actual build logic is in scripts/build)
|
||||
# ------------------------------------------------------------
|
||||
DISTROS := arch debian ubuntu fedora centos
|
||||
DISTROS := arch debian ubuntu fedora centos
|
||||
BASE_IMAGE_ARCH := archlinux:latest
|
||||
BASE_IMAGE_DEBIAN := debian:stable-slim
|
||||
BASE_IMAGE_UBUNTU := ubuntu:latest
|
||||
@@ -27,6 +27,10 @@ export BASE_IMAGE_UBUNTU
|
||||
export BASE_IMAGE_FEDORA
|
||||
export BASE_IMAGE_CENTOS
|
||||
|
||||
# PYthon Unittest Pattern
|
||||
TEST_PATTERN := test_*.py
|
||||
export TEST_PATTERN
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# PKGMGR setup (developer wrapper -> scripts/installation/main.sh)
|
||||
# ------------------------------------------------------------
|
||||
@@ -46,16 +50,16 @@ build:
|
||||
# Test targets (delegated to scripts/test)
|
||||
# ------------------------------------------------------------
|
||||
|
||||
test-unit:
|
||||
test-unit: build-missing
|
||||
@bash scripts/test/test-unit.sh
|
||||
|
||||
test-integration:
|
||||
test-integration: build-missing
|
||||
@bash scripts/test/test-integration.sh
|
||||
|
||||
test-e2e:
|
||||
test-e2e: build-missing
|
||||
@bash scripts/test/test-e2e.sh
|
||||
|
||||
test-container:
|
||||
test-container: build-missing
|
||||
@bash scripts/test/test-container.sh
|
||||
|
||||
# ------------------------------------------------------------
|
||||
@@ -65,7 +69,7 @@ build-missing:
|
||||
@bash scripts/build/build-image-missing.sh
|
||||
|
||||
# Combined test target for local + CI (unit + e2e + integration)
|
||||
test: build-missing test-container test-unit test-e2e test-integration
|
||||
test: test-container test-unit test-e2e test-integration
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# System install (native packages, calls scripts/installation/run-package.sh)
|
||||
|
||||
2
PKGBUILD
2
PKGBUILD
@@ -1,7 +1,7 @@
|
||||
# Maintainer: Kevin Veen-Birkenbach <info@veen.world>
|
||||
|
||||
pkgname=package-manager
|
||||
pkgver=0.7.0
|
||||
pkgver=0.7.7
|
||||
pkgrel=1
|
||||
pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)."
|
||||
arch=('any')
|
||||
|
||||
42
debian/changelog
vendored
42
debian/changelog
vendored
@@ -1,3 +1,45 @@
|
||||
package-manager (0.7.7-1) unstable; urgency=medium
|
||||
|
||||
* Added TEST_PATTERN parameter to execute dedicated tests
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 17:54:38 +0100
|
||||
|
||||
package-manager (0.7.6-1) unstable; urgency=medium
|
||||
|
||||
* Fixed pull --preview bug in e2e test
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 17:14:19 +0100
|
||||
|
||||
package-manager (0.7.5-1) unstable; urgency=medium
|
||||
|
||||
* Fixed wrong directory permissions for nix
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 16:45:42 +0100
|
||||
|
||||
package-manager (0.7.4-1) unstable; urgency=medium
|
||||
|
||||
* Fixed missing build in test workflow -> Tests pass now
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 16:22:00 +0100
|
||||
|
||||
package-manager (0.7.3-1) unstable; urgency=medium
|
||||
|
||||
* Fixed bug: Ignored packages are now ignored
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 16:08:31 +0100
|
||||
|
||||
package-manager (0.7.2-1) unstable; urgency=medium
|
||||
|
||||
* Implemented Changelog Support for Fedora and Debian
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 15:48:58 +0100
|
||||
|
||||
package-manager (0.7.1-1) unstable; urgency=medium
|
||||
|
||||
* Fix floating 'latest' tag logic: dereference annotated target (vX.Y.Z^{}), add tag message to avoid Git errors, ensure best-effort update without blocking releases, and update unit tests (see ChatGPT conversation: https://chatgpt.com/share/69383024-efa4-800f-a875-129b81fa40ff).
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 15:26:54 +0100
|
||||
|
||||
package-manager (0.7.0-1) unstable; urgency=medium
|
||||
|
||||
* Add Git helpers for branch sync and floating 'latest' tag in the release workflow, ensure main/master are updated from origin before tagging, and extend unit/e2e tests including 'pkgmgr release --help' coverage (see ChatGPT conversation: https://chatgpt.com/share/69383024-efa4-800f-a875-129b81fa40ff)
|
||||
|
||||
@@ -31,7 +31,7 @@
|
||||
rec {
|
||||
pkgmgr = pyPkgs.buildPythonApplication {
|
||||
pname = "package-manager";
|
||||
version = "0.7.0";
|
||||
version = "0.7.7";
|
||||
|
||||
# Use the git repo as source
|
||||
src = ./.;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
Name: package-manager
|
||||
Version: 0.7.0
|
||||
Version: 0.7.7
|
||||
Release: 1%{?dist}
|
||||
Summary: Wrapper that runs Kevin's package-manager via Nix flake
|
||||
|
||||
@@ -77,5 +77,23 @@ echo ">>> package-manager removed. Nix itself was not removed."
|
||||
/usr/lib/package-manager/
|
||||
|
||||
%changelog
|
||||
* Tue Dec 09 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 0.7.7-1
|
||||
- Added TEST_PATTERN parameter to execute dedicated tests
|
||||
|
||||
* Tue Dec 09 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 0.7.6-1
|
||||
- Fixed pull --preview bug in e2e test
|
||||
|
||||
* Tue Dec 09 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 0.7.5-1
|
||||
- Fixed wrong directory permissions for nix
|
||||
|
||||
* Tue Dec 09 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 0.7.4-1
|
||||
- Fixed missing build in test workflow -> Tests pass now
|
||||
|
||||
* Tue Dec 09 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 0.7.3-1
|
||||
- Fixed bug: Ignored packages are now ignored
|
||||
|
||||
* Tue Dec 09 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 0.7.2-1
|
||||
- Implemented Changelog Support for Fedora and Debian
|
||||
|
||||
* Sat Dec 06 2025 Kevin Veen-Birkenbach <info@veen.world> - 0.1.1-1
|
||||
- Initial RPM packaging for package-manager
|
||||
|
||||
@@ -1,761 +0,0 @@
|
||||
# pkgmgr/release.py
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
pkgmgr/release.py
|
||||
|
||||
Release helper for pkgmgr.
|
||||
|
||||
Responsibilities (Milestone 7):
|
||||
- Determine the next semantic version based on existing Git tags.
|
||||
- Update pyproject.toml with the new version.
|
||||
- Update additional packaging files (flake.nix, PKGBUILD,
|
||||
debian/changelog, RPM spec) where present.
|
||||
- Prepend a basic entry to CHANGELOG.md.
|
||||
- Commit, tag, and push the release on the current branch.
|
||||
|
||||
Additional behaviour:
|
||||
- If `preview=True` (from --preview), no files are written and no
|
||||
Git commands are executed. Instead, a detailed summary of the
|
||||
planned changes and commands is printed.
|
||||
- If `preview=False` and not forced, the release is executed in two
|
||||
phases:
|
||||
1) Preview-only run (dry-run).
|
||||
2) Interactive confirmation, then real release if confirmed.
|
||||
This confirmation can be skipped with the `force=True` flag.
|
||||
- If `close=True` is used and the current branch is not main/master,
|
||||
the branch will be closed via branch_commands.close_branch() after
|
||||
a successful release.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from datetime import date, datetime
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from pkgmgr.core.git import get_tags, get_current_branch, GitError
|
||||
from pkgmgr.actions.branch import close_branch
|
||||
from pkgmgr.core.version.semver import (
|
||||
SemVer,
|
||||
find_latest_version,
|
||||
bump_major,
|
||||
bump_minor,
|
||||
bump_patch,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers for Git + version discovery
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _determine_current_version() -> SemVer:
|
||||
"""
|
||||
Determine the current semantic version from Git tags.
|
||||
|
||||
Behaviour:
|
||||
- If there are no tags or no SemVer-compatible tags, return 0.0.0.
|
||||
- Otherwise, use the latest SemVer tag as current version.
|
||||
"""
|
||||
tags = get_tags()
|
||||
if not tags:
|
||||
return SemVer(0, 0, 0)
|
||||
|
||||
latest = find_latest_version(tags)
|
||||
if latest is None:
|
||||
return SemVer(0, 0, 0)
|
||||
|
||||
_tag, ver = latest
|
||||
return ver
|
||||
|
||||
|
||||
def _bump_semver(current: SemVer, release_type: str) -> SemVer:
|
||||
"""
|
||||
Bump the given SemVer according to the release type.
|
||||
|
||||
release_type must be one of: "major", "minor", "patch".
|
||||
"""
|
||||
if release_type == "major":
|
||||
return bump_major(current)
|
||||
if release_type == "minor":
|
||||
return bump_minor(current)
|
||||
if release_type == "patch":
|
||||
return bump_patch(current)
|
||||
|
||||
raise ValueError(f"Unknown release type: {release_type!r}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Low-level Git command helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _run_git_command(cmd: str) -> None:
|
||||
"""
|
||||
Run a Git (or shell) command with basic error reporting.
|
||||
|
||||
The command is executed via the shell, primarily for readability
|
||||
when printed (as in 'git commit -am "msg"').
|
||||
"""
|
||||
print(f"[GIT] {cmd}")
|
||||
try:
|
||||
subprocess.run(cmd, shell=True, check=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
print(f"[ERROR] Git command failed: {cmd}")
|
||||
print(f" Exit code: {exc.returncode}")
|
||||
if exc.stdout:
|
||||
print("--- stdout ---")
|
||||
print(exc.stdout)
|
||||
if exc.stderr:
|
||||
print("--- stderr ---")
|
||||
print(exc.stderr)
|
||||
raise GitError(f"Git command failed: {cmd}") from exc
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Editor helper for interactive changelog messages
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
|
||||
"""
|
||||
Open $EDITOR (fallback 'nano') so the user can enter a changelog message.
|
||||
|
||||
The temporary file is pre-filled with commented instructions and an
|
||||
optional initial_message. Lines starting with '#' are ignored when the
|
||||
message is read back.
|
||||
|
||||
Returns the final message (may be empty string if user leaves it blank).
|
||||
"""
|
||||
editor = os.environ.get("EDITOR", "nano")
|
||||
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w+",
|
||||
delete=False,
|
||||
encoding="utf-8",
|
||||
) as tmp:
|
||||
tmp_path = tmp.name
|
||||
tmp.write(
|
||||
"# Write the changelog entry for this release.\n"
|
||||
"# Lines starting with '#' will be ignored.\n"
|
||||
"# Empty result will fall back to a generic message.\n\n"
|
||||
)
|
||||
if initial_message:
|
||||
tmp.write(initial_message.strip() + "\n")
|
||||
tmp.flush()
|
||||
|
||||
try:
|
||||
subprocess.call([editor, tmp_path])
|
||||
except FileNotFoundError:
|
||||
print(
|
||||
f"[WARN] Editor {editor!r} not found; proceeding without "
|
||||
"interactive changelog message."
|
||||
)
|
||||
|
||||
try:
|
||||
with open(tmp_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
finally:
|
||||
try:
|
||||
os.remove(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
lines = [
|
||||
line for line in content.splitlines()
|
||||
if not line.strip().startswith("#")
|
||||
]
|
||||
return "\n".join(lines).strip()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# File update helpers (pyproject + extra packaging + changelog)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def update_pyproject_version(
|
||||
pyproject_path: str,
|
||||
new_version: str,
|
||||
preview: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Update the version in pyproject.toml with the new version.
|
||||
|
||||
The function looks for a line matching:
|
||||
|
||||
version = "X.Y.Z"
|
||||
|
||||
and replaces the version part with the given new_version string.
|
||||
"""
|
||||
try:
|
||||
with open(pyproject_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
except FileNotFoundError:
|
||||
print(f"[ERROR] pyproject.toml not found at: {pyproject_path}")
|
||||
sys.exit(1)
|
||||
|
||||
pattern = r'^(version\s*=\s*")([^"]+)(")'
|
||||
new_content, count = re.subn(
|
||||
pattern,
|
||||
lambda m: f'{m.group(1)}{new_version}{m.group(3)}',
|
||||
content,
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
|
||||
if count == 0:
|
||||
print("[ERROR] Could not find version line in pyproject.toml")
|
||||
sys.exit(1)
|
||||
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would update pyproject.toml version to {new_version}")
|
||||
return
|
||||
|
||||
with open(pyproject_path, "w", encoding="utf-8") as f:
|
||||
f.write(new_content)
|
||||
|
||||
print(f"Updated pyproject.toml version to {new_version}")
|
||||
|
||||
|
||||
def update_flake_version(
|
||||
flake_path: str,
|
||||
new_version: str,
|
||||
preview: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Update the version in flake.nix, if present.
|
||||
"""
|
||||
if not os.path.exists(flake_path):
|
||||
print("[INFO] flake.nix not found, skipping.")
|
||||
return
|
||||
|
||||
try:
|
||||
with open(flake_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
except Exception as exc:
|
||||
print(f"[WARN] Could not read flake.nix: {exc}")
|
||||
return
|
||||
|
||||
pattern = r'(version\s*=\s*")([^"]+)(")'
|
||||
new_content, count = re.subn(
|
||||
pattern,
|
||||
lambda m: f'{m.group(1)}{new_version}{m.group(3)}',
|
||||
content,
|
||||
)
|
||||
|
||||
if count == 0:
|
||||
print("[WARN] No version assignment found in flake.nix, skipping.")
|
||||
return
|
||||
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would update flake.nix version to {new_version}")
|
||||
return
|
||||
|
||||
with open(flake_path, "w", encoding="utf-8") as f:
|
||||
f.write(new_content)
|
||||
|
||||
print(f"Updated flake.nix version to {new_version}")
|
||||
|
||||
|
||||
def update_pkgbuild_version(
|
||||
pkgbuild_path: str,
|
||||
new_version: str,
|
||||
preview: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Update the version in PKGBUILD, if present.
|
||||
|
||||
Expects:
|
||||
pkgver=1.2.3
|
||||
pkgrel=1
|
||||
"""
|
||||
if not os.path.exists(pkgbuild_path):
|
||||
print("[INFO] PKGBUILD not found, skipping.")
|
||||
return
|
||||
|
||||
try:
|
||||
with open(pkgbuild_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
except Exception as exc:
|
||||
print(f"[WARN] Could not read PKGBUILD: {exc}")
|
||||
return
|
||||
|
||||
ver_pattern = r"^(pkgver\s*=\s*)(.+)$"
|
||||
new_content, ver_count = re.subn(
|
||||
ver_pattern,
|
||||
lambda m: f"{m.group(1)}{new_version}",
|
||||
content,
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
|
||||
if ver_count == 0:
|
||||
print("[WARN] No pkgver line found in PKGBUILD.")
|
||||
new_content = content
|
||||
|
||||
rel_pattern = r"^(pkgrel\s*=\s*)(.+)$"
|
||||
new_content, rel_count = re.subn(
|
||||
rel_pattern,
|
||||
lambda m: f"{m.group(1)}1",
|
||||
new_content,
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
|
||||
if rel_count == 0:
|
||||
print("[WARN] No pkgrel line found in PKGBUILD.")
|
||||
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would update PKGBUILD to pkgver={new_version}, pkgrel=1")
|
||||
return
|
||||
|
||||
with open(pkgbuild_path, "w", encoding="utf-8") as f:
|
||||
f.write(new_content)
|
||||
|
||||
print(f"Updated PKGBUILD to pkgver={new_version}, pkgrel=1")
|
||||
|
||||
|
||||
def update_spec_version(
|
||||
spec_path: str,
|
||||
new_version: str,
|
||||
preview: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Update the version in an RPM spec file, if present.
|
||||
"""
|
||||
if not os.path.exists(spec_path):
|
||||
print("[INFO] RPM spec file not found, skipping.")
|
||||
return
|
||||
|
||||
try:
|
||||
with open(spec_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
except Exception as exc:
|
||||
print(f"[WARN] Could not read spec file: {exc}")
|
||||
return
|
||||
|
||||
ver_pattern = r"^(Version:\s*)(.+)$"
|
||||
new_content, ver_count = re.subn(
|
||||
ver_pattern,
|
||||
lambda m: f"{m.group(1)}{new_version}",
|
||||
content,
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
|
||||
if ver_count == 0:
|
||||
print("[WARN] No 'Version:' line found in spec file.")
|
||||
|
||||
rel_pattern = r"^(Release:\s*)(.+)$"
|
||||
|
||||
def _release_repl(m: re.Match[str]) -> str: # type: ignore[name-defined]
|
||||
rest = m.group(2).strip()
|
||||
match = re.match(r"^(\d+)(.*)$", rest)
|
||||
if match:
|
||||
suffix = match.group(2)
|
||||
else:
|
||||
suffix = ""
|
||||
return f"{m.group(1)}1{suffix}"
|
||||
|
||||
new_content, rel_count = re.subn(
|
||||
rel_pattern,
|
||||
_release_repl,
|
||||
new_content,
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
|
||||
if rel_count == 0:
|
||||
print("[WARN] No 'Release:' line found in spec file.")
|
||||
|
||||
if preview:
|
||||
print(
|
||||
f"[PREVIEW] Would update spec file "
|
||||
f"{os.path.basename(spec_path)} to Version: {new_version}, Release: 1..."
|
||||
)
|
||||
return
|
||||
|
||||
with open(spec_path, "w", encoding="utf-8") as f:
|
||||
f.write(new_content)
|
||||
|
||||
print(
|
||||
f"Updated spec file {os.path.basename(spec_path)} "
|
||||
f"to Version: {new_version}, Release: 1..."
|
||||
)
|
||||
|
||||
|
||||
def update_changelog(
|
||||
changelog_path: str,
|
||||
new_version: str,
|
||||
message: Optional[str] = None,
|
||||
preview: bool = False,
|
||||
) -> str:
|
||||
"""
|
||||
Prepend a new release section to CHANGELOG.md with the new version,
|
||||
current date, and a message.
|
||||
"""
|
||||
today = date.today().isoformat()
|
||||
|
||||
if message is None:
|
||||
if preview:
|
||||
message = "Automated release."
|
||||
else:
|
||||
print(
|
||||
"\n[INFO] No release message provided, opening editor for "
|
||||
"changelog entry...\n"
|
||||
)
|
||||
editor_message = _open_editor_for_changelog()
|
||||
if not editor_message:
|
||||
message = "Automated release."
|
||||
else:
|
||||
message = editor_message
|
||||
|
||||
header = f"## [{new_version}] - {today}\n"
|
||||
header += f"\n* {message}\n\n"
|
||||
|
||||
if os.path.exists(changelog_path):
|
||||
try:
|
||||
with open(changelog_path, "r", encoding="utf-8") as f:
|
||||
changelog = f.read()
|
||||
except Exception as exc:
|
||||
print(f"[WARN] Could not read existing CHANGELOG.md: {exc}")
|
||||
changelog = ""
|
||||
else:
|
||||
changelog = ""
|
||||
|
||||
new_changelog = header + "\n" + changelog if changelog else header
|
||||
|
||||
print("\n================ CHANGELOG ENTRY ================")
|
||||
print(header.rstrip())
|
||||
print("=================================================\n")
|
||||
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would prepend new entry for {new_version} to CHANGELOG.md")
|
||||
return message
|
||||
|
||||
with open(changelog_path, "w", encoding="utf-8") as f:
|
||||
f.write(new_changelog)
|
||||
|
||||
print(f"Updated CHANGELOG.md with version {new_version}")
|
||||
|
||||
return message
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Debian changelog helpers (with Git config fallback for maintainer)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _get_git_config_value(key: str) -> Optional[str]:
|
||||
"""
|
||||
Try to read a value from `git config --get <key>`.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "config", "--get", key],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
value = result.stdout.strip()
|
||||
return value or None
|
||||
|
||||
|
||||
def _get_debian_author() -> Tuple[str, str]:
|
||||
"""
|
||||
Determine the maintainer name/email for debian/changelog entries.
|
||||
"""
|
||||
name = os.environ.get("DEBFULLNAME")
|
||||
email = os.environ.get("DEBEMAIL")
|
||||
|
||||
if not name:
|
||||
name = os.environ.get("GIT_AUTHOR_NAME")
|
||||
if not email:
|
||||
email = os.environ.get("GIT_AUTHOR_EMAIL")
|
||||
|
||||
if not name:
|
||||
name = _get_git_config_value("user.name")
|
||||
if not email:
|
||||
email = _get_git_config_value("user.email")
|
||||
|
||||
if not name:
|
||||
name = "Unknown Maintainer"
|
||||
if not email:
|
||||
email = "unknown@example.com"
|
||||
|
||||
return name, email
|
||||
|
||||
|
||||
def update_debian_changelog(
|
||||
debian_changelog_path: str,
|
||||
package_name: str,
|
||||
new_version: str,
|
||||
message: Optional[str] = None,
|
||||
preview: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Prepend a new entry to debian/changelog, if it exists.
|
||||
"""
|
||||
if not os.path.exists(debian_changelog_path):
|
||||
print("[INFO] debian/changelog not found, skipping.")
|
||||
return
|
||||
|
||||
debian_version = f"{new_version}-1"
|
||||
now = datetime.now().astimezone()
|
||||
date_str = now.strftime("%a, %d %b %Y %H:%M:%S %z")
|
||||
|
||||
author_name, author_email = _get_debian_author()
|
||||
|
||||
first_line = f"{package_name} ({debian_version}) unstable; urgency=medium"
|
||||
body_line = message.strip() if message else f"Automated release {new_version}."
|
||||
stanza = (
|
||||
f"{first_line}\n\n"
|
||||
f" * {body_line}\n\n"
|
||||
f" -- {author_name} <{author_email}> {date_str}\n\n"
|
||||
)
|
||||
|
||||
if preview:
|
||||
print(
|
||||
"[PREVIEW] Would prepend the following stanza to debian/changelog:\n"
|
||||
f"{stanza}"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
with open(debian_changelog_path, "r", encoding="utf-8") as f:
|
||||
existing = f.read()
|
||||
except Exception as exc:
|
||||
print(f"[WARN] Could not read debian/changelog: {exc}")
|
||||
existing = ""
|
||||
|
||||
new_content = stanza + existing
|
||||
|
||||
with open(debian_changelog_path, "w", encoding="utf-8") as f:
|
||||
f.write(new_content)
|
||||
|
||||
print(f"Updated debian/changelog with version {debian_version}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal implementation (single-phase, preview or real)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _release_impl(
|
||||
pyproject_path: str = "pyproject.toml",
|
||||
changelog_path: str = "CHANGELOG.md",
|
||||
release_type: str = "patch",
|
||||
message: Optional[str] = None,
|
||||
preview: bool = False,
|
||||
close: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Internal implementation that performs a single-phase release.
|
||||
"""
|
||||
current_ver = _determine_current_version()
|
||||
new_ver = _bump_semver(current_ver, release_type)
|
||||
new_ver_str = str(new_ver)
|
||||
new_tag = new_ver.to_tag(with_prefix=True)
|
||||
|
||||
mode = "PREVIEW" if preview else "REAL"
|
||||
print(f"Release mode: {mode}")
|
||||
print(f"Current version: {current_ver}")
|
||||
print(f"New version: {new_ver_str} ({release_type})")
|
||||
|
||||
repo_root = os.path.dirname(os.path.abspath(pyproject_path))
|
||||
|
||||
update_pyproject_version(pyproject_path, new_ver_str, preview=preview)
|
||||
changelog_message = update_changelog(
|
||||
changelog_path,
|
||||
new_ver_str,
|
||||
message=message,
|
||||
preview=preview,
|
||||
)
|
||||
|
||||
flake_path = os.path.join(repo_root, "flake.nix")
|
||||
update_flake_version(flake_path, new_ver_str, preview=preview)
|
||||
|
||||
pkgbuild_path = os.path.join(repo_root, "PKGBUILD")
|
||||
update_pkgbuild_version(pkgbuild_path, new_ver_str, preview=preview)
|
||||
|
||||
spec_path = os.path.join(repo_root, "package-manager.spec")
|
||||
update_spec_version(spec_path, new_ver_str, preview=preview)
|
||||
|
||||
effective_message: Optional[str] = message
|
||||
if effective_message is None and isinstance(changelog_message, str):
|
||||
if changelog_message.strip():
|
||||
effective_message = changelog_message.strip()
|
||||
|
||||
debian_changelog_path = os.path.join(repo_root, "debian", "changelog")
|
||||
package_name = os.path.basename(repo_root) or "package-manager"
|
||||
update_debian_changelog(
|
||||
debian_changelog_path,
|
||||
package_name=package_name,
|
||||
new_version=new_ver_str,
|
||||
message=effective_message,
|
||||
preview=preview,
|
||||
)
|
||||
|
||||
commit_msg = f"Release version {new_ver_str}"
|
||||
tag_msg = effective_message or commit_msg
|
||||
|
||||
try:
|
||||
branch = get_current_branch() or "main"
|
||||
except GitError:
|
||||
branch = "main"
|
||||
print(f"Releasing on branch: {branch}")
|
||||
|
||||
files_to_add = [
|
||||
pyproject_path,
|
||||
changelog_path,
|
||||
flake_path,
|
||||
pkgbuild_path,
|
||||
spec_path,
|
||||
debian_changelog_path,
|
||||
]
|
||||
existing_files = [p for p in files_to_add if p and os.path.exists(p)]
|
||||
|
||||
if preview:
|
||||
for path in existing_files:
|
||||
print(f"[PREVIEW] Would run: git add {path}")
|
||||
print(f'[PREVIEW] Would run: git commit -am "{commit_msg}"')
|
||||
print(f'[PREVIEW] Would run: git tag -a {new_tag} -m "{tag_msg}"')
|
||||
print(f"[PREVIEW] Would run: git push origin {branch}")
|
||||
print("[PREVIEW] Would run: git push origin --tags")
|
||||
|
||||
if close and branch not in ("main", "master"):
|
||||
print(
|
||||
f"[PREVIEW] Would also close branch {branch} after the release "
|
||||
"(close=True and branch is not main/master)."
|
||||
)
|
||||
elif close:
|
||||
print(
|
||||
f"[PREVIEW] close=True but current branch is {branch}; "
|
||||
"no branch would be closed."
|
||||
)
|
||||
|
||||
print("Preview completed. No changes were made.")
|
||||
return
|
||||
|
||||
for path in existing_files:
|
||||
_run_git_command(f"git add {path}")
|
||||
|
||||
_run_git_command(f'git commit -am "{commit_msg}"')
|
||||
_run_git_command(f'git tag -a {new_tag} -m "{tag_msg}"')
|
||||
_run_git_command(f"git push origin {branch}")
|
||||
_run_git_command("git push origin --tags")
|
||||
|
||||
print(f"Release {new_ver_str} completed.")
|
||||
|
||||
if close:
|
||||
if branch in ("main", "master"):
|
||||
print(
|
||||
f"[INFO] close=True but current branch is {branch}; "
|
||||
"nothing to close."
|
||||
)
|
||||
return
|
||||
|
||||
print(
|
||||
f"[INFO] Closing branch {branch} after successful release "
|
||||
"(close=True and branch is not main/master)..."
|
||||
)
|
||||
try:
|
||||
close_branch(name=branch, base_branch="main", cwd=".")
|
||||
except Exception as exc: # pragma: no cover
|
||||
print(f"[WARN] Failed to close branch {branch} automatically: {exc}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public release entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def release(
|
||||
pyproject_path: str = "pyproject.toml",
|
||||
changelog_path: str = "CHANGELOG.md",
|
||||
release_type: str = "patch",
|
||||
message: Optional[str] = None,
|
||||
preview: bool = False,
|
||||
force: bool = False,
|
||||
close: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
High-level release entry point.
|
||||
|
||||
Modes:
|
||||
|
||||
- preview=True:
|
||||
* Single-phase PREVIEW only.
|
||||
|
||||
- preview=False, force=True:
|
||||
* Single-phase REAL release, no interactive preview.
|
||||
|
||||
- preview=False, force=False:
|
||||
* Two-phase flow (intended default for interactive CLI use).
|
||||
"""
|
||||
if preview:
|
||||
_release_impl(
|
||||
pyproject_path=pyproject_path,
|
||||
changelog_path=changelog_path,
|
||||
release_type=release_type,
|
||||
message=message,
|
||||
preview=True,
|
||||
close=close,
|
||||
)
|
||||
return
|
||||
|
||||
if force:
|
||||
_release_impl(
|
||||
pyproject_path=pyproject_path,
|
||||
changelog_path=changelog_path,
|
||||
release_type=release_type,
|
||||
message=message,
|
||||
preview=False,
|
||||
close=close,
|
||||
)
|
||||
return
|
||||
|
||||
if not sys.stdin.isatty():
|
||||
_release_impl(
|
||||
pyproject_path=pyproject_path,
|
||||
changelog_path=changelog_path,
|
||||
release_type=release_type,
|
||||
message=message,
|
||||
preview=False,
|
||||
close=close,
|
||||
)
|
||||
return
|
||||
|
||||
print("[INFO] Running preview before actual release...\n")
|
||||
_release_impl(
|
||||
pyproject_path=pyproject_path,
|
||||
changelog_path=changelog_path,
|
||||
release_type=release_type,
|
||||
message=message,
|
||||
preview=True,
|
||||
close=close,
|
||||
)
|
||||
|
||||
try:
|
||||
answer = input("Proceed with the actual release? [y/N]: ").strip().lower()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print("\n[INFO] Release aborted (no confirmation).")
|
||||
return
|
||||
|
||||
if answer not in ("y", "yes"):
|
||||
print("Release aborted by user. No changes were made.")
|
||||
return
|
||||
|
||||
print("\n[INFO] Running REAL release...\n")
|
||||
_release_impl(
|
||||
pyproject_path=pyproject_path,
|
||||
changelog_path=changelog_path,
|
||||
release_type=release_type,
|
||||
message=message,
|
||||
preview=False,
|
||||
close=close,
|
||||
)
|
||||
@@ -49,6 +49,7 @@ from .files import (
|
||||
update_spec_version,
|
||||
update_changelog,
|
||||
update_debian_changelog,
|
||||
update_spec_changelog,
|
||||
)
|
||||
|
||||
|
||||
@@ -98,6 +99,8 @@ def _release_impl(
|
||||
spec_path = os.path.join(repo_root, "package-manager.spec")
|
||||
update_spec_version(spec_path, new_ver_str, preview=preview)
|
||||
|
||||
# Determine a single effective_message to be reused across all
|
||||
# changelog targets (project, Debian, Fedora).
|
||||
effective_message: Optional[str] = message
|
||||
if effective_message is None and isinstance(changelog_message, str):
|
||||
if changelog_message.strip():
|
||||
@@ -105,6 +108,8 @@ def _release_impl(
|
||||
|
||||
debian_changelog_path = os.path.join(repo_root, "debian", "changelog")
|
||||
package_name = os.path.basename(repo_root) or "package-manager"
|
||||
|
||||
# Debian changelog
|
||||
update_debian_changelog(
|
||||
debian_changelog_path,
|
||||
package_name=package_name,
|
||||
@@ -113,6 +118,15 @@ def _release_impl(
|
||||
preview=preview,
|
||||
)
|
||||
|
||||
# Fedora / RPM %changelog
|
||||
update_spec_changelog(
|
||||
spec_path=spec_path,
|
||||
package_name=package_name,
|
||||
new_version=new_ver_str,
|
||||
message=effective_message,
|
||||
preview=preview,
|
||||
)
|
||||
|
||||
commit_msg = f"Release version {new_ver_str}"
|
||||
tag_msg = effective_message or commit_msg
|
||||
|
||||
@@ -171,8 +185,15 @@ def _release_impl(
|
||||
run_git_command("git push origin --tags")
|
||||
|
||||
# Move 'latest' to the new release tag so the newest SemVer is always
|
||||
# marked as latest.
|
||||
update_latest_tag(new_tag, preview=False)
|
||||
# marked as latest. This is best-effort and must not break the release.
|
||||
try:
|
||||
update_latest_tag(new_tag, preview=False)
|
||||
except GitError as exc: # pragma: no cover
|
||||
print(
|
||||
f"[WARN] Failed to update floating 'latest' tag for {new_tag}: {exc}\n"
|
||||
"[WARN] The release itself completed successfully; only the "
|
||||
"'latest' tag was not updated."
|
||||
)
|
||||
|
||||
print(f"Release {new_ver_str} completed.")
|
||||
|
||||
|
||||
@@ -8,7 +8,10 @@ Responsibilities:
|
||||
- Update pyproject.toml with the new version.
|
||||
- Update flake.nix, PKGBUILD, RPM spec files where present.
|
||||
- Prepend release entries to CHANGELOG.md.
|
||||
- Maintain debian/changelog entries, including maintainer metadata.
|
||||
- Maintain distribution-specific changelog files:
|
||||
* debian/changelog
|
||||
* RPM spec %changelog section
|
||||
including maintainer metadata where applicable.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -442,3 +445,82 @@ def update_debian_changelog(
|
||||
f.write(new_content)
|
||||
|
||||
print(f"Updated debian/changelog with version {debian_version}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fedora / RPM spec %changelog helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def update_spec_changelog(
|
||||
spec_path: str,
|
||||
package_name: str,
|
||||
new_version: str,
|
||||
message: Optional[str] = None,
|
||||
preview: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Prepend a new entry to the %changelog section of an RPM spec file,
|
||||
if present.
|
||||
|
||||
Typical RPM-style entry:
|
||||
|
||||
* Tue Dec 09 2025 John Doe <john@example.com> - 0.5.1-1
|
||||
- Your changelog message
|
||||
"""
|
||||
if not os.path.exists(spec_path):
|
||||
print("[INFO] RPM spec file not found, skipping spec changelog update.")
|
||||
return
|
||||
|
||||
try:
|
||||
with open(spec_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
except Exception as exc:
|
||||
print(f"[WARN] Could not read spec file for changelog update: {exc}")
|
||||
return
|
||||
|
||||
debian_version = f"{new_version}-1"
|
||||
now = datetime.now().astimezone()
|
||||
date_str = now.strftime("%a %b %d %Y")
|
||||
|
||||
# Reuse Debian maintainer discovery for author name/email.
|
||||
author_name, author_email = _get_debian_author()
|
||||
|
||||
body_line = message.strip() if message else f"Automated release {new_version}."
|
||||
|
||||
stanza = (
|
||||
f"* {date_str} {author_name} <{author_email}> - {debian_version}\n"
|
||||
f"- {body_line}\n\n"
|
||||
)
|
||||
|
||||
marker = "%changelog"
|
||||
idx = content.find(marker)
|
||||
|
||||
if idx == -1:
|
||||
# No %changelog section yet: append one at the end.
|
||||
new_content = content.rstrip() + "\n\n%changelog\n" + stanza
|
||||
else:
|
||||
# Insert stanza right after the %changelog line.
|
||||
before = content[: idx + len(marker)]
|
||||
after = content[idx + len(marker) :]
|
||||
new_content = before + "\n" + stanza + after.lstrip("\n")
|
||||
|
||||
if preview:
|
||||
print(
|
||||
"[PREVIEW] Would update RPM %changelog section with the following "
|
||||
"stanza:\n"
|
||||
f"{stanza}"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
with open(spec_path, "w", encoding="utf-8") as f:
|
||||
f.write(new_content)
|
||||
except Exception as exc:
|
||||
print(f"[WARN] Failed to write updated spec changelog section: {exc}")
|
||||
return
|
||||
|
||||
print(
|
||||
f"Updated RPM %changelog section in {os.path.basename(spec_path)} "
|
||||
f"for {package_name} {debian_version}"
|
||||
)
|
||||
|
||||
@@ -71,12 +71,25 @@ def sync_branch_with_remote(branch: str, preview: bool = False) -> None:
|
||||
def update_latest_tag(new_tag: str, preview: bool = False) -> None:
|
||||
"""
|
||||
Move the floating 'latest' tag to the newly created release tag.
|
||||
|
||||
Implementation details:
|
||||
- We explicitly dereference the tag object via `<tag>^{}` so that
|
||||
'latest' always points at the underlying commit, not at another tag.
|
||||
- We create/update 'latest' as an annotated tag with a short message so
|
||||
Git configurations that enforce annotated/signed tags do not fail
|
||||
with "no tag message".
|
||||
"""
|
||||
print(f"[INFO] Updating 'latest' tag to point at {new_tag}...")
|
||||
target_ref = f"{new_tag}^{{}}"
|
||||
print(f"[INFO] Updating 'latest' tag to point at {new_tag} (commit {target_ref})...")
|
||||
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would run: git tag -f latest {new_tag}")
|
||||
print(f"[PREVIEW] Would run: git tag -f -a latest {target_ref} "
|
||||
f'-m "Floating latest tag for {new_tag}"')
|
||||
print("[PREVIEW] Would run: git push origin latest --force")
|
||||
return
|
||||
|
||||
run_git_command(f"git tag -f latest {new_tag}")
|
||||
run_git_command(
|
||||
f'git tag -f -a latest {target_ref} '
|
||||
f'-m "Floating latest tag for {new_tag}"'
|
||||
)
|
||||
run_git_command("git push origin latest --force")
|
||||
|
||||
@@ -1,35 +1,57 @@
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from pkgmgr.core.repository.identifier import get_repo_identifier
|
||||
from pkgmgr.core.repository.dir import get_repo_dir
|
||||
from pkgmgr.core.repository.verify import verify_repository
|
||||
|
||||
|
||||
def pull_with_verification(
|
||||
selected_repos,
|
||||
repositories_base_dir,
|
||||
all_repos,
|
||||
extra_args,
|
||||
no_verification,
|
||||
preview:bool):
|
||||
preview: bool,
|
||||
) -> None:
|
||||
"""
|
||||
Executes "git pull" for each repository with verification.
|
||||
|
||||
Uses the verify_repository function in "pull" mode.
|
||||
If verification fails (and verification info is set) and --no-verification is not enabled,
|
||||
the user is prompted to confirm the pull.
|
||||
Execute `git pull` for each repository with verification.
|
||||
|
||||
- Uses verify_repository() in "pull" mode.
|
||||
- If verification fails (and verification info is set) and
|
||||
--no-verification is not enabled, the user is prompted to confirm
|
||||
the pull.
|
||||
- In preview mode, no interactive prompts are performed and no
|
||||
Git commands are executed; only the would-be command is printed.
|
||||
"""
|
||||
for repo in selected_repos:
|
||||
repo_identifier = get_repo_identifier(repo, all_repos)
|
||||
repo_dir = get_repo_dir(repositories_base_dir, repo)
|
||||
|
||||
if not os.path.exists(repo_dir):
|
||||
print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.")
|
||||
continue
|
||||
|
||||
verified_info = repo.get("verified")
|
||||
verified_ok, errors, commit_hash, signing_key = verify_repository(repo, repo_dir, mode="pull", no_verification=no_verification)
|
||||
verified_ok, errors, commit_hash, signing_key = verify_repository(
|
||||
repo,
|
||||
repo_dir,
|
||||
mode="pull",
|
||||
no_verification=no_verification,
|
||||
)
|
||||
|
||||
if not no_verification and verified_info and not verified_ok:
|
||||
# Only prompt the user if:
|
||||
# - we are NOT in preview mode
|
||||
# - verification is enabled
|
||||
# - the repo has verification info configured
|
||||
# - verification failed
|
||||
if (
|
||||
not preview
|
||||
and not no_verification
|
||||
and verified_info
|
||||
and not verified_ok
|
||||
):
|
||||
print(f"Warning: Verification failed for {repo_identifier}:")
|
||||
for err in errors:
|
||||
print(f" - {err}")
|
||||
@@ -37,12 +59,19 @@ def pull_with_verification(
|
||||
if choice != "y":
|
||||
continue
|
||||
|
||||
full_cmd = f"git pull {' '.join(extra_args)}"
|
||||
# Build the git pull command (include extra args if present)
|
||||
args_part = " ".join(extra_args) if extra_args else ""
|
||||
full_cmd = f"git pull{(' ' + args_part) if args_part else ''}"
|
||||
|
||||
if preview:
|
||||
# Preview mode: only show the command, do not execute or prompt.
|
||||
print(f"[Preview] In '{repo_dir}': {full_cmd}")
|
||||
else:
|
||||
print(f"Running in '{repo_dir}': {full_cmd}")
|
||||
result = subprocess.run(full_cmd, cwd=repo_dir, shell=True)
|
||||
if result.returncode != 0:
|
||||
print(f"'git pull' for {repo_identifier} failed with exit code {result.returncode}.")
|
||||
print(
|
||||
f"'git pull' for {repo_identifier} failed "
|
||||
f"with exit code {result.returncode}."
|
||||
)
|
||||
sys.exit(result.returncode)
|
||||
|
||||
@@ -8,6 +8,7 @@ import re
|
||||
from typing import Any, Dict, List, Sequence
|
||||
|
||||
from pkgmgr.core.repository.resolve import resolve_repos
|
||||
from pkgmgr.core.repository.ignored import filter_ignored
|
||||
|
||||
Repository = Dict[str, Any]
|
||||
|
||||
@@ -88,7 +89,7 @@ def _apply_filters(
|
||||
if not _match_pattern(ident_str, string_pattern):
|
||||
continue
|
||||
|
||||
# Category filter: nur echte Kategorien, KEINE Tags
|
||||
# Category filter: only real categories, NOT tags
|
||||
if category_patterns:
|
||||
cats: List[str] = []
|
||||
cats.extend(map(str, repo.get("category_files", [])))
|
||||
@@ -106,7 +107,7 @@ def _apply_filters(
|
||||
if not ok:
|
||||
continue
|
||||
|
||||
# Tag filter: ausschließlich YAML-Tags
|
||||
# Tag filter: YAML tags only
|
||||
if tag_patterns:
|
||||
tags: List[str] = list(map(str, repo.get("tags", [])))
|
||||
if not tags:
|
||||
@@ -124,16 +125,38 @@ def _apply_filters(
|
||||
|
||||
return filtered
|
||||
|
||||
|
||||
def _maybe_filter_ignored(args, repos: List[Repository]) -> List[Repository]:
|
||||
"""
|
||||
Apply ignore filtering unless the caller explicitly opted to include ignored
|
||||
repositories (via args.include_ignored).
|
||||
|
||||
Note: this helper is used only for *implicit* selections (all / filters /
|
||||
by-directory). For *explicit* identifiers we do NOT filter ignored repos,
|
||||
so the user can still target them directly if desired.
|
||||
"""
|
||||
include_ignored: bool = bool(getattr(args, "include_ignored", False))
|
||||
if include_ignored:
|
||||
return repos
|
||||
return filter_ignored(repos)
|
||||
|
||||
|
||||
def get_selected_repos(args, all_repositories: List[Repository]) -> List[Repository]:
|
||||
"""
|
||||
Compute the list of repositories selected by CLI arguments.
|
||||
|
||||
Modes:
|
||||
- If identifiers are given: select via resolve_repos() from all_repositories.
|
||||
- Else if any of --category/--string/--tag is used: start from all_repositories
|
||||
and apply filters.
|
||||
- Else if --all is set: select all_repositories.
|
||||
- Else: try to select the repository of the current working directory.
|
||||
Ignored repositories are *not* filtered here, so explicit identifiers
|
||||
always win.
|
||||
- Else if any of --category/--string/--tag is used: start from
|
||||
all_repositories, apply filters and then drop ignored repos.
|
||||
- Else if --all is set: select all_repositories and then drop ignored repos.
|
||||
- Else: try to select the repository of the current working directory
|
||||
and then drop it if it is ignored.
|
||||
|
||||
The ignore filter can be bypassed by setting args.include_ignored = True
|
||||
(e.g. via a CLI flag --include-ignored).
|
||||
"""
|
||||
identifiers: List[str] = getattr(args, "identifiers", []) or []
|
||||
use_all: bool = bool(getattr(args, "all", False))
|
||||
@@ -143,18 +166,25 @@ def get_selected_repos(args, all_repositories: List[Repository]) -> List[Reposit
|
||||
|
||||
has_filters = bool(category_patterns or string_pattern or tag_patterns)
|
||||
|
||||
# 1) Explicit identifiers win
|
||||
# 1) Explicit identifiers win and bypass ignore filtering
|
||||
if identifiers:
|
||||
base = resolve_repos(identifiers, all_repositories)
|
||||
return _apply_filters(base, string_pattern, category_patterns, tag_patterns)
|
||||
|
||||
# 2) Filter-only mode: start from all repositories
|
||||
if has_filters:
|
||||
return _apply_filters(list(all_repositories), string_pattern, category_patterns, tag_patterns)
|
||||
base = _apply_filters(
|
||||
list(all_repositories),
|
||||
string_pattern,
|
||||
category_patterns,
|
||||
tag_patterns,
|
||||
)
|
||||
return _maybe_filter_ignored(args, base)
|
||||
|
||||
# 3) --all (no filters): all repos
|
||||
if use_all:
|
||||
return list(all_repositories)
|
||||
base = list(all_repositories)
|
||||
return _maybe_filter_ignored(args, base)
|
||||
|
||||
# 4) Fallback: try to select repository of current working directory
|
||||
cwd = os.path.abspath(os.getcwd())
|
||||
@@ -164,7 +194,7 @@ def get_selected_repos(args, all_repositories: List[Repository]) -> List[Reposit
|
||||
if os.path.abspath(str(repo.get("directory", ""))) == cwd
|
||||
]
|
||||
if by_dir:
|
||||
return by_dir
|
||||
return _maybe_filter_ignored(args, by_dir)
|
||||
|
||||
# No specific match -> empty list
|
||||
return []
|
||||
|
||||
@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "package-manager"
|
||||
version = "0.7.0"
|
||||
version = "0.7.7"
|
||||
description = "Kevin's package-manager tool (pkgmgr)"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
|
||||
@@ -97,11 +97,32 @@ if [[ "${IN_CONTAINER}" -eq 1 && "${EUID:-0}" -eq 0 ]]; then
|
||||
useradd -m -r -g nixbld -s /usr/bin/bash nix
|
||||
fi
|
||||
|
||||
# Create /nix directory and hand it to nix user (prevents installer sudo prompt)
|
||||
# Ensure /nix exists and is writable by the "nix" user.
|
||||
#
|
||||
# In some base images (or previous runs), /nix may already exist and be
|
||||
# owned by root. In that case the Nix single-user installer will abort with:
|
||||
#
|
||||
# "directory /nix exists, but is not writable by you"
|
||||
#
|
||||
# To keep container runs idempotent and robust, we always enforce
|
||||
# ownership nix:nixbld here.
|
||||
if [[ ! -d /nix ]]; then
|
||||
echo "[init-nix] Creating /nix with owner nix:nixbld..."
|
||||
mkdir -m 0755 /nix
|
||||
chown nix:nixbld /nix
|
||||
else
|
||||
current_owner="$(stat -c '%U' /nix 2>/dev/null || echo '?')"
|
||||
current_group="$(stat -c '%G' /nix 2>/dev/null || echo '?')"
|
||||
if [[ "${current_owner}" != "nix" || "${current_group}" != "nixbld" ]]; then
|
||||
echo "[init-nix] /nix already exists with owner ${current_owner}:${current_group} – fixing to nix:nixbld..."
|
||||
chown -R nix:nixbld /nix
|
||||
else
|
||||
echo "[init-nix] /nix already exists with correct owner nix:nixbld."
|
||||
fi
|
||||
|
||||
if [[ ! -w /nix ]]; then
|
||||
echo "[init-nix] WARNING: /nix is still not writable after chown; Nix installer may fail."
|
||||
fi
|
||||
fi
|
||||
|
||||
# Run Nix single-user installer as "nix"
|
||||
|
||||
@@ -18,6 +18,7 @@ for distro in $DISTROS; do
|
||||
$MOUNT_NIX \
|
||||
-v "pkgmgr_nix_cache:/root/.cache/nix" \
|
||||
-e PKGMGR_DEV=1 \
|
||||
-e TEST_PATTERN="${TEST_PATTERN}" \
|
||||
--workdir /src \
|
||||
--entrypoint bash \
|
||||
"package-manager-test-$distro" \
|
||||
@@ -51,6 +52,6 @@ for distro in $DISTROS; do
|
||||
nix develop .#default --no-write-lock-file -c \
|
||||
python3 -m unittest discover \
|
||||
-s /src/tests/e2e \
|
||||
-p "test_*.py";
|
||||
-p "$TEST_PATTERN";
|
||||
'
|
||||
done
|
||||
|
||||
@@ -10,6 +10,7 @@ docker run --rm \
|
||||
-v "pkgmgr_nix_cache:/root/.cache/nix" \
|
||||
--workdir /src \
|
||||
-e PKGMGR_DEV=1 \
|
||||
-e TEST_PATTERN="${TEST_PATTERN}" \
|
||||
--entrypoint bash \
|
||||
"package-manager-test-arch" \
|
||||
-c '
|
||||
@@ -19,5 +20,5 @@ docker run --rm \
|
||||
python -m unittest discover \
|
||||
-s tests/integration \
|
||||
-t /src \
|
||||
-p "test_*.py";
|
||||
-p "$TEST_PATTERN";
|
||||
'
|
||||
|
||||
@@ -10,6 +10,7 @@ docker run --rm \
|
||||
-v "pkgmgr_nix_cache:/root/.cache/nix" \
|
||||
--workdir /src \
|
||||
-e PKGMGR_DEV=1 \
|
||||
-e TEST_PATTERN="${TEST_PATTERN}" \
|
||||
--entrypoint bash \
|
||||
"package-manager-test-arch" \
|
||||
-c '
|
||||
@@ -19,5 +20,5 @@ docker run --rm \
|
||||
python -m unittest discover \
|
||||
-s tests/unit \
|
||||
-t /src \
|
||||
-p "test_*.py";
|
||||
-p "$TEST_PATTERN";
|
||||
'
|
||||
|
||||
@@ -35,7 +35,7 @@ def remove_pkgmgr_from_nix_profile() -> None:
|
||||
prints a descriptive format without an index column inside the container.
|
||||
|
||||
Instead, we directly try to remove possible names:
|
||||
- 'pkgmgr' (the actual name shown in `nix profile list`)
|
||||
- 'pkgmgr' (the actual name shown in `nix profile list`)
|
||||
- 'package-manager' (the name mentioned in Nix's own error hints)
|
||||
"""
|
||||
for spec in ("pkgmgr", "package-manager"):
|
||||
@@ -76,29 +76,47 @@ def pkgmgr_help_debug() -> None:
|
||||
print(f"returncode: {proc.returncode}")
|
||||
print("--- END ---\n")
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise AssertionError(f"'pkgmgr --help' failed with exit code {proc.returncode}")
|
||||
|
||||
# Wichtig: Hier KEIN AssertionError mehr – das ist reine Debug-Ausgabe.
|
||||
# Falls du später hart testen willst, kannst du optional:
|
||||
# if proc.returncode != 0:
|
||||
# self.fail("...")
|
||||
# aber aktuell nur Sichtprüfung.
|
||||
# Important: this is **debug-only**. Do NOT fail the test here.
|
||||
# If you ever want to hard-assert on this, you can add an explicit
|
||||
# assertion in the test method instead of here.
|
||||
|
||||
|
||||
class TestIntegrationInstalPKGMGRShallow(unittest.TestCase):
|
||||
def test_install_pkgmgr_self_install(self) -> None:
|
||||
# Debug before cleanup
|
||||
nix_profile_list_debug("BEFORE CLEANUP")
|
||||
"""
|
||||
End-to-end test that runs "python main.py install pkgmgr ..." inside
|
||||
the test container.
|
||||
|
||||
# Cleanup: aggressively try to drop any pkgmgr/profile entries
|
||||
remove_pkgmgr_from_nix_profile()
|
||||
|
||||
# Debug after cleanup
|
||||
nix_profile_list_debug("AFTER CLEANUP")
|
||||
We isolate HOME into /tmp/pkgmgr-self-install so that:
|
||||
- ~/.config/pkgmgr points to an isolated test config area
|
||||
- ~/Repositories is owned by the current user inside the container
|
||||
(avoiding Nix's 'repository path is not owned by current user' error)
|
||||
"""
|
||||
# Use a dedicated HOME for this test to avoid permission/ownership issues
|
||||
temp_home = "/tmp/pkgmgr-self-install"
|
||||
os.makedirs(temp_home, exist_ok=True)
|
||||
|
||||
original_argv = sys.argv
|
||||
original_environ = os.environ.copy()
|
||||
|
||||
try:
|
||||
# Isolate HOME so that ~ expands to /tmp/pkgmgr-self-install
|
||||
os.environ["HOME"] = temp_home
|
||||
|
||||
# Optional: ensure XDG_* also use the temp HOME for extra isolation
|
||||
os.environ.setdefault("XDG_CONFIG_HOME", os.path.join(temp_home, ".config"))
|
||||
os.environ.setdefault("XDG_CACHE_HOME", os.path.join(temp_home, ".cache"))
|
||||
os.environ.setdefault("XDG_DATA_HOME", os.path.join(temp_home, ".local", "share"))
|
||||
|
||||
# Debug before cleanup
|
||||
nix_profile_list_debug("BEFORE CLEANUP")
|
||||
|
||||
# Cleanup: aggressively try to drop any pkgmgr/profile entries
|
||||
remove_pkgmgr_from_nix_profile()
|
||||
|
||||
# Debug after cleanup
|
||||
nix_profile_list_debug("AFTER CLEANUP")
|
||||
|
||||
sys.argv = [
|
||||
"python",
|
||||
"install",
|
||||
@@ -107,13 +125,18 @@ class TestIntegrationInstalPKGMGRShallow(unittest.TestCase):
|
||||
"shallow",
|
||||
"--no-verification",
|
||||
]
|
||||
# Führt die Installation via main.py aus
|
||||
|
||||
# Run installation via main.py
|
||||
runpy.run_module("main", run_name="__main__")
|
||||
|
||||
# Nach erfolgreicher Installation: pkgmgr --help anzeigen (Debug)
|
||||
# After successful installation: run `pkgmgr --help` for debug
|
||||
pkgmgr_help_debug()
|
||||
|
||||
finally:
|
||||
sys.argv = original_argv
|
||||
# Restore full environment
|
||||
os.environ.clear()
|
||||
os.environ.update(original_environ)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -13,6 +13,7 @@ from pkgmgr.actions.release.files import (
|
||||
update_spec_version,
|
||||
update_changelog,
|
||||
update_debian_changelog,
|
||||
update_spec_changelog,
|
||||
)
|
||||
|
||||
|
||||
@@ -310,5 +311,94 @@ class TestUpdateDebianChangelog(unittest.TestCase):
|
||||
self.assertEqual(content, original)
|
||||
|
||||
|
||||
class TestUpdateSpecChangelog(unittest.TestCase):
|
||||
def test_update_spec_changelog_inserts_stanza_after_changelog_marker(self) -> None:
|
||||
original = textwrap.dedent(
|
||||
"""
|
||||
Name: package-manager
|
||||
Version: 0.1.0
|
||||
Release: 5%{?dist}
|
||||
|
||||
%description
|
||||
Some description.
|
||||
|
||||
%changelog
|
||||
* Mon Jan 01 2024 Old Maintainer <old@example.com> - 0.1.0-1
|
||||
- Old entry
|
||||
"""
|
||||
).strip() + "\n"
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
path = os.path.join(tmpdir, "package-manager.spec")
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write(original)
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"DEBFULLNAME": "Test Maintainer",
|
||||
"DEBEMAIL": "test@example.com",
|
||||
},
|
||||
clear=False,
|
||||
):
|
||||
update_spec_changelog(
|
||||
spec_path=path,
|
||||
package_name="package-manager",
|
||||
new_version="1.2.3",
|
||||
message="Fedora changelog entry",
|
||||
preview=False,
|
||||
)
|
||||
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
# Neue Stanza muss nach %changelog stehen
|
||||
self.assertIn("%changelog", content)
|
||||
self.assertIn("Fedora changelog entry", content)
|
||||
self.assertIn("Test Maintainer <test@example.com>", content)
|
||||
# Alte Einträge müssen erhalten bleiben
|
||||
self.assertIn("Old Maintainer <old@example.com>", content)
|
||||
|
||||
def test_update_spec_changelog_preview_does_not_write(self) -> None:
|
||||
original = textwrap.dedent(
|
||||
"""
|
||||
Name: package-manager
|
||||
Version: 0.1.0
|
||||
Release: 5%{?dist}
|
||||
|
||||
%changelog
|
||||
* Mon Jan 01 2024 Old Maintainer <old@example.com> - 0.1.0-1
|
||||
- Old entry
|
||||
"""
|
||||
).strip() + "\n"
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
path = os.path.join(tmpdir, "package-manager.spec")
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write(original)
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"DEBFULLNAME": "Test Maintainer",
|
||||
"DEBEMAIL": "test@example.com",
|
||||
},
|
||||
clear=False,
|
||||
):
|
||||
update_spec_changelog(
|
||||
spec_path=path,
|
||||
package_name="package-manager",
|
||||
new_version="1.2.3",
|
||||
message="Fedora changelog entry",
|
||||
preview=True,
|
||||
)
|
||||
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
# Im Preview-Modus darf nichts verändert werden
|
||||
self.assertEqual(content, original)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -75,16 +75,19 @@ class TestUpdateLatestTag(unittest.TestCase):
|
||||
mock_run_git_command.assert_not_called()
|
||||
|
||||
@patch("pkgmgr.actions.release.git_ops.run_git_command")
|
||||
def test_update_latest_tag_real_calls_git(
|
||||
def test_update_latest_tag_real_calls_git_with_dereference_and_message(
|
||||
self,
|
||||
mock_run_git_command,
|
||||
) -> None:
|
||||
update_latest_tag("v1.2.3", preview=False)
|
||||
|
||||
calls = [c.args[0] for c in mock_run_git_command.call_args_list]
|
||||
self.assertIn("git tag -f latest v1.2.3", calls)
|
||||
# Must dereference the tag object and create an annotated tag with message
|
||||
self.assertIn(
|
||||
'git tag -f -a latest v1.2.3^{} -m "Floating latest tag for v1.2.3"',
|
||||
calls,
|
||||
)
|
||||
self.assertIn("git push origin latest --force", calls)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -19,6 +19,7 @@ class TestReleaseOrchestration(unittest.TestCase):
|
||||
patch("pkgmgr.actions.release.update_pkgbuild_version") as mock_update_pkgbuild, \
|
||||
patch("pkgmgr.actions.release.update_spec_version") as mock_update_spec, \
|
||||
patch("pkgmgr.actions.release.update_debian_changelog") as mock_update_debian_changelog, \
|
||||
patch("pkgmgr.actions.release.update_spec_changelog") as mock_update_spec_changelog, \
|
||||
patch("pkgmgr.actions.release.run_git_command") as mock_run_git_command, \
|
||||
patch("pkgmgr.actions.release.sync_branch_with_remote") as mock_sync_branch, \
|
||||
patch("pkgmgr.actions.release.update_latest_tag") as mock_update_latest_tag:
|
||||
@@ -48,7 +49,7 @@ class TestReleaseOrchestration(unittest.TestCase):
|
||||
self.assertEqual(args[1], "1.2.4")
|
||||
self.assertEqual(kwargs.get("preview"), False)
|
||||
|
||||
# changelog update
|
||||
# changelog update (Projekt)
|
||||
mock_update_changelog.assert_called_once()
|
||||
args, kwargs = mock_update_changelog.call_args
|
||||
self.assertEqual(args[0], "CHANGELOG.md")
|
||||
@@ -72,6 +73,13 @@ class TestReleaseOrchestration(unittest.TestCase):
|
||||
False,
|
||||
)
|
||||
|
||||
# Fedora / RPM %changelog helper
|
||||
mock_update_spec_changelog.assert_called_once()
|
||||
self.assertEqual(
|
||||
mock_update_spec_changelog.call_args[1].get("preview"),
|
||||
False,
|
||||
)
|
||||
|
||||
# Git operations
|
||||
mock_get_current_branch.assert_called_once()
|
||||
self.assertEqual(mock_get_current_branch.return_value, "develop")
|
||||
@@ -96,6 +104,7 @@ class TestReleaseOrchestration(unittest.TestCase):
|
||||
patch("pkgmgr.actions.release.update_pkgbuild_version") as mock_update_pkgbuild, \
|
||||
patch("pkgmgr.actions.release.update_spec_version") as mock_update_spec, \
|
||||
patch("pkgmgr.actions.release.update_debian_changelog") as mock_update_debian_changelog, \
|
||||
patch("pkgmgr.actions.release.update_spec_changelog") as mock_update_spec_changelog, \
|
||||
patch("pkgmgr.actions.release.run_git_command") as mock_run_git_command, \
|
||||
patch("pkgmgr.actions.release.sync_branch_with_remote") as mock_sync_branch, \
|
||||
patch("pkgmgr.actions.release.update_latest_tag") as mock_update_latest_tag:
|
||||
@@ -129,6 +138,10 @@ class TestReleaseOrchestration(unittest.TestCase):
|
||||
mock_update_debian_changelog.assert_called_once()
|
||||
self.assertTrue(mock_update_debian_changelog.call_args[1].get("preview"))
|
||||
|
||||
# Fedora / RPM spec changelog helper in preview mode
|
||||
mock_update_spec_changelog.assert_called_once()
|
||||
self.assertTrue(mock_update_spec_changelog.call_args[1].get("preview"))
|
||||
|
||||
# In preview mode no real git commands must be executed
|
||||
mock_run_git_command.assert_not_called()
|
||||
|
||||
|
||||
309
tests/unit/pkgmgr/actions/repos/test_pull_with_verification.py
Normal file
309
tests/unit/pkgmgr/actions/repos/test_pull_with_verification.py
Normal file
@@ -0,0 +1,309 @@
|
||||
import io
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from pkgmgr.actions.repository.pull import pull_with_verification
|
||||
|
||||
|
||||
class TestPullWithVerification(unittest.TestCase):
|
||||
"""
|
||||
Comprehensive unit tests for pull_with_verification().
|
||||
|
||||
These tests verify:
|
||||
- Preview mode behaviour
|
||||
- Verification logic (prompting, bypassing, skipping)
|
||||
- subprocess.run invocation
|
||||
- Repository directory existence checks
|
||||
- Handling of extra git pull arguments
|
||||
"""
|
||||
|
||||
def _setup_mocks(self, mock_exists, mock_get_repo_id, mock_get_repo_dir,
|
||||
mock_verify, exists=True, verified_ok=True,
|
||||
errors=None, verified_info=True):
|
||||
"""Helper to configure repetitive mock behavior."""
|
||||
repo = {
|
||||
"name": "pkgmgr",
|
||||
"verified": {"gpg_keys": ["ABCDEF"]} if verified_info else None,
|
||||
}
|
||||
mock_exists.return_value = exists
|
||||
mock_get_repo_id.return_value = "pkgmgr"
|
||||
mock_get_repo_dir.return_value = "/fake/base/pkgmgr"
|
||||
mock_verify.return_value = (
|
||||
verified_ok,
|
||||
errors or [],
|
||||
"deadbeef", # commit hash
|
||||
"ABCDEF", # signing key
|
||||
)
|
||||
return repo
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
@patch("pkgmgr.actions.repository.pull.subprocess.run")
|
||||
@patch("pkgmgr.actions.repository.pull.verify_repository")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_dir")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_identifier")
|
||||
@patch("pkgmgr.actions.repository.pull.os.path.exists")
|
||||
@patch("builtins.input")
|
||||
def test_preview_mode_non_interactive(
|
||||
self,
|
||||
mock_input,
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
mock_subprocess,
|
||||
):
|
||||
"""
|
||||
Preview mode must NEVER request user input and must NEVER execute git.
|
||||
It must only print the preview command.
|
||||
"""
|
||||
repo = self._setup_mocks(
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
exists=True,
|
||||
verified_ok=False,
|
||||
errors=["bad signature"],
|
||||
verified_info=True,
|
||||
)
|
||||
|
||||
buf = io.StringIO()
|
||||
with patch("sys.stdout", new=buf):
|
||||
pull_with_verification(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir="/fake/base",
|
||||
all_repos=[repo],
|
||||
extra_args=["--ff-only"],
|
||||
no_verification=False,
|
||||
preview=True,
|
||||
)
|
||||
|
||||
output = buf.getvalue()
|
||||
self.assertIn(
|
||||
"[Preview] In '/fake/base/pkgmgr': git pull --ff-only",
|
||||
output,
|
||||
)
|
||||
|
||||
mock_input.assert_not_called()
|
||||
mock_subprocess.assert_not_called()
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
@patch("pkgmgr.actions.repository.pull.subprocess.run")
|
||||
@patch("pkgmgr.actions.repository.pull.verify_repository")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_dir")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_identifier")
|
||||
@patch("pkgmgr.actions.repository.pull.os.path.exists")
|
||||
@patch("builtins.input")
|
||||
def test_verification_failure_user_declines(
|
||||
self,
|
||||
mock_input,
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
mock_subprocess,
|
||||
):
|
||||
"""
|
||||
If verification fails and preview=False, the user is prompted.
|
||||
If the user declines ('n'), no git command is executed.
|
||||
"""
|
||||
repo = self._setup_mocks(
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
verified_ok=False,
|
||||
errors=["signature invalid"],
|
||||
)
|
||||
|
||||
mock_input.return_value = "n"
|
||||
|
||||
buf = io.StringIO()
|
||||
with patch("sys.stdout", new=buf):
|
||||
pull_with_verification(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir="/fake/base",
|
||||
all_repos=[repo],
|
||||
extra_args=[],
|
||||
no_verification=False,
|
||||
preview=False,
|
||||
)
|
||||
|
||||
mock_input.assert_called_once()
|
||||
mock_subprocess.assert_not_called()
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
@patch("pkgmgr.actions.repository.pull.subprocess.run")
|
||||
@patch("pkgmgr.actions.repository.pull.verify_repository")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_dir")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_identifier")
|
||||
@patch("pkgmgr.actions.repository.pull.os.path.exists")
|
||||
@patch("builtins.input")
|
||||
def test_verification_failure_user_accepts_runs_git(
|
||||
self,
|
||||
mock_input,
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
mock_subprocess,
|
||||
):
|
||||
"""
|
||||
If verification fails and the user accepts ('y'),
|
||||
then the git pull should be executed.
|
||||
"""
|
||||
repo = self._setup_mocks(
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
verified_ok=False,
|
||||
errors=["invalid"],
|
||||
)
|
||||
|
||||
mock_input.return_value = "y"
|
||||
mock_subprocess.return_value = MagicMock(returncode=0)
|
||||
|
||||
pull_with_verification(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir="/fake/base",
|
||||
all_repos=[repo],
|
||||
extra_args=[],
|
||||
no_verification=False,
|
||||
preview=False,
|
||||
)
|
||||
|
||||
mock_subprocess.assert_called_once()
|
||||
mock_input.assert_called_once()
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
@patch("pkgmgr.actions.repository.pull.subprocess.run")
|
||||
@patch("pkgmgr.actions.repository.pull.verify_repository")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_dir")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_identifier")
|
||||
@patch("pkgmgr.actions.repository.pull.os.path.exists")
|
||||
@patch("builtins.input")
|
||||
def test_verification_success_no_prompt(
|
||||
self,
|
||||
mock_input,
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
mock_subprocess,
|
||||
):
|
||||
"""
|
||||
If verification is successful, the user should NOT be prompted,
|
||||
and git pull should run immediately.
|
||||
"""
|
||||
repo = self._setup_mocks(
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
verified_ok=True,
|
||||
)
|
||||
|
||||
mock_subprocess.return_value = MagicMock(returncode=0)
|
||||
|
||||
pull_with_verification(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir="/fake/base",
|
||||
all_repos=[repo],
|
||||
extra_args=["--rebase"],
|
||||
no_verification=False,
|
||||
preview=False,
|
||||
)
|
||||
|
||||
mock_input.assert_not_called()
|
||||
mock_subprocess.assert_called_once()
|
||||
cmd = mock_subprocess.call_args[0][0]
|
||||
self.assertIn("git pull --rebase", cmd)
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
@patch("pkgmgr.actions.repository.pull.subprocess.run")
|
||||
@patch("pkgmgr.actions.repository.pull.verify_repository")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_dir")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_identifier")
|
||||
@patch("pkgmgr.actions.repository.pull.os.path.exists")
|
||||
@patch("builtins.input")
|
||||
def test_directory_missing_skips_repo(
|
||||
self,
|
||||
mock_input,
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
mock_subprocess,
|
||||
):
|
||||
"""
|
||||
If the repository directory does not exist, the repo must be skipped
|
||||
silently and no git command executed.
|
||||
"""
|
||||
repo = self._setup_mocks(
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
exists=False,
|
||||
)
|
||||
|
||||
buf = io.StringIO()
|
||||
with patch("sys.stdout", new=buf):
|
||||
pull_with_verification(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir="/fake/base",
|
||||
all_repos=[repo],
|
||||
extra_args=[],
|
||||
no_verification=False,
|
||||
preview=False,
|
||||
)
|
||||
|
||||
output = buf.getvalue()
|
||||
self.assertIn("not found", output)
|
||||
|
||||
mock_input.assert_not_called()
|
||||
mock_subprocess.assert_not_called()
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
@patch("pkgmgr.actions.repository.pull.subprocess.run")
|
||||
@patch("pkgmgr.actions.repository.pull.verify_repository")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_dir")
|
||||
@patch("pkgmgr.actions.repository.pull.get_repo_identifier")
|
||||
@patch("pkgmgr.actions.repository.pull.os.path.exists")
|
||||
@patch("builtins.input")
|
||||
def test_no_verification_flag_skips_prompt(
|
||||
self,
|
||||
mock_input,
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
mock_subprocess,
|
||||
):
|
||||
"""
|
||||
If no_verification=True, verification failures must NOT prompt.
|
||||
Git pull should run directly.
|
||||
"""
|
||||
repo = self._setup_mocks(
|
||||
mock_exists,
|
||||
mock_get_repo_id,
|
||||
mock_get_repo_dir,
|
||||
mock_verify,
|
||||
verified_ok=False,
|
||||
errors=["invalid"],
|
||||
)
|
||||
|
||||
mock_subprocess.return_value = MagicMock(returncode=0)
|
||||
|
||||
pull_with_verification(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir="/fake/base",
|
||||
all_repos=[repo],
|
||||
extra_args=[],
|
||||
no_verification=True,
|
||||
preview=False,
|
||||
)
|
||||
|
||||
mock_input.assert_not_called()
|
||||
mock_subprocess.assert_called_once()
|
||||
29
tests/unit/pkgmgr/core/repository/test_ignored.py
Normal file
29
tests/unit/pkgmgr/core/repository/test_ignored.py
Normal file
@@ -0,0 +1,29 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import unittest
|
||||
|
||||
from pkgmgr.core.repository.ignored import filter_ignored
|
||||
|
||||
|
||||
class TestFilterIgnored(unittest.TestCase):
|
||||
def test_filter_ignored_removes_repos_with_ignore_true(self) -> None:
|
||||
repos = [
|
||||
{"provider": "github.com", "account": "user", "repository": "a", "ignore": True},
|
||||
{"provider": "github.com", "account": "user", "repository": "b", "ignore": False},
|
||||
{"provider": "github.com", "account": "user", "repository": "c"},
|
||||
]
|
||||
|
||||
result = filter_ignored(repos)
|
||||
|
||||
identifiers = {(r["provider"], r["account"], r["repository"]) for r in result}
|
||||
self.assertNotIn(("github.com", "user", "a"), identifiers)
|
||||
self.assertIn(("github.com", "user", "b"), identifiers)
|
||||
self.assertIn(("github.com", "user", "c"), identifiers)
|
||||
|
||||
def test_filter_ignored_empty_list_returns_empty_list(self) -> None:
|
||||
self.assertEqual(filter_ignored([]), [])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
180
tests/unit/pkgmgr/core/repository/test_selected.py
Normal file
180
tests/unit/pkgmgr/core/repository/test_selected.py
Normal file
@@ -0,0 +1,180 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import unittest
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.core.repository.selected import get_selected_repos
|
||||
|
||||
|
||||
def _repo(
|
||||
provider: str,
|
||||
account: str,
|
||||
repository: str,
|
||||
ignore: bool | None = None,
|
||||
directory: str | None = None,
|
||||
):
|
||||
repo = {
|
||||
"provider": provider,
|
||||
"account": account,
|
||||
"repository": repository,
|
||||
}
|
||||
if ignore is not None:
|
||||
repo["ignore"] = ignore
|
||||
if directory is not None:
|
||||
repo["directory"] = directory
|
||||
return repo
|
||||
|
||||
|
||||
class TestGetSelectedRepos(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.repo_ignored = _repo(
|
||||
"github.com",
|
||||
"user",
|
||||
"ignored-repo",
|
||||
ignore=True,
|
||||
directory="/repos/github.com/user/ignored-repo",
|
||||
)
|
||||
self.repo_visible = _repo(
|
||||
"github.com",
|
||||
"user",
|
||||
"visible-repo",
|
||||
ignore=False,
|
||||
directory="/repos/github.com/user/visible-repo",
|
||||
)
|
||||
self.all_repos = [self.repo_ignored, self.repo_visible]
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 1) Explizite Identifier – ignorierte Repos dürfen ausgewählt werden
|
||||
# ------------------------------------------------------------------
|
||||
def test_identifiers_bypass_ignore_filter(self) -> None:
|
||||
args = SimpleNamespace(
|
||||
identifiers=["ignored-repo"], # matches by repository name
|
||||
all=False,
|
||||
category=[],
|
||||
string="",
|
||||
tag=[],
|
||||
include_ignored=False, # should be ignored for explicit identifiers
|
||||
)
|
||||
|
||||
selected = get_selected_repos(args, self.all_repos)
|
||||
|
||||
self.assertEqual(len(selected), 1)
|
||||
self.assertIs(selected[0], self.repo_ignored)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 2) Filter-only Modus – ignorierte Repos werden rausgefiltert
|
||||
# ------------------------------------------------------------------
|
||||
def test_filter_mode_excludes_ignored_by_default(self) -> None:
|
||||
# string-Filter, der beide Repos matchen würde
|
||||
args = SimpleNamespace(
|
||||
identifiers=[],
|
||||
all=False,
|
||||
category=[],
|
||||
string="repo", # substring in beiden Namen
|
||||
tag=[],
|
||||
include_ignored=False,
|
||||
)
|
||||
|
||||
selected = get_selected_repos(args, self.all_repos)
|
||||
|
||||
self.assertEqual(len(selected), 1)
|
||||
self.assertIs(selected[0], self.repo_visible)
|
||||
|
||||
def test_filter_mode_can_include_ignored_when_flag_set(self) -> None:
|
||||
args = SimpleNamespace(
|
||||
identifiers=[],
|
||||
all=False,
|
||||
category=[],
|
||||
string="repo",
|
||||
tag=[],
|
||||
include_ignored=True,
|
||||
)
|
||||
|
||||
selected = get_selected_repos(args, self.all_repos)
|
||||
|
||||
# Beide Repos sollten erscheinen, weil include_ignored=True
|
||||
self.assertEqual({r["repository"] for r in selected}, {"ignored-repo", "visible-repo"})
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 3) --all Modus – ignorierte Repos werden per Default entfernt
|
||||
# ------------------------------------------------------------------
|
||||
def test_all_mode_excludes_ignored_by_default(self) -> None:
|
||||
args = SimpleNamespace(
|
||||
identifiers=[],
|
||||
all=True,
|
||||
category=[],
|
||||
string="",
|
||||
tag=[],
|
||||
include_ignored=False,
|
||||
)
|
||||
|
||||
selected = get_selected_repos(args, self.all_repos)
|
||||
|
||||
self.assertEqual(len(selected), 1)
|
||||
self.assertIs(selected[0], self.repo_visible)
|
||||
|
||||
def test_all_mode_can_include_ignored_when_flag_set(self) -> None:
|
||||
args = SimpleNamespace(
|
||||
identifiers=[],
|
||||
all=True,
|
||||
category=[],
|
||||
string="",
|
||||
tag=[],
|
||||
include_ignored=True,
|
||||
)
|
||||
|
||||
selected = get_selected_repos(args, self.all_repos)
|
||||
|
||||
self.assertEqual(len(selected), 2)
|
||||
self.assertCountEqual(
|
||||
[r["repository"] for r in selected],
|
||||
["ignored-repo", "visible-repo"],
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 4) CWD-Modus – Repo anhand des aktuellen Verzeichnisses auswählen
|
||||
# ------------------------------------------------------------------
|
||||
def test_cwd_selection_excludes_ignored_by_default(self) -> None:
|
||||
# Wir lassen CWD auf das Verzeichnis des ignorierten Repos zeigen.
|
||||
cwd = os.path.abspath(self.repo_ignored["directory"])
|
||||
|
||||
args = SimpleNamespace(
|
||||
identifiers=[],
|
||||
all=False,
|
||||
category=[],
|
||||
string="",
|
||||
tag=[],
|
||||
include_ignored=False,
|
||||
)
|
||||
|
||||
with patch("os.getcwd", return_value=cwd):
|
||||
selected = get_selected_repos(args, self.all_repos)
|
||||
|
||||
# Da das einzige Repo für dieses Verzeichnis ignoriert ist,
|
||||
# sollte die Auswahl leer sein.
|
||||
self.assertEqual(selected, [])
|
||||
|
||||
def test_cwd_selection_can_include_ignored_when_flag_set(self) -> None:
|
||||
cwd = os.path.abspath(self.repo_ignored["directory"])
|
||||
|
||||
args = SimpleNamespace(
|
||||
identifiers=[],
|
||||
all=False,
|
||||
category=[],
|
||||
string="",
|
||||
tag=[],
|
||||
include_ignored=True,
|
||||
)
|
||||
|
||||
with patch("os.getcwd", return_value=cwd):
|
||||
selected = get_selected_repos(args, self.all_repos)
|
||||
|
||||
self.assertEqual(len(selected), 1)
|
||||
self.assertIs(selected[0], self.repo_ignored)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user