refactor(release/git): replace shell git calls with command/query helpers
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled

- Remove legacy shell-based git helpers from release workflow
- Introduce typed git command wrappers (add, commit, fetch, pull_ff_only, push, tag*)
- Add git queries for upstream detection and tag listing
- Refactor release workflow to use core git commands consistently
- Implement semantic vX.Y.Z tag comparison without external sort
- Ensure prerelease tags (e.g. -rc) do not outrank final releases
- Split and update unit tests to match new command/query architecture
This commit is contained in:
Kevin Veen-Birkenbach
2025-12-16 12:30:36 +01:00
parent 486863eb58
commit f83e192e37
18 changed files with 583 additions and 276 deletions

View File

@@ -1,73 +1,90 @@
from __future__ import annotations
import subprocess
from pkgmgr.core.git import GitError
from pkgmgr.core.git.commands import (
fetch,
pull_ff_only,
push,
tag_force_annotated,
)
from pkgmgr.core.git.queries import get_upstream_ref, list_tags
def run_git_command(cmd: str) -> None:
print(f"[GIT] {cmd}")
try:
subprocess.run(
cmd,
shell=True,
check=True,
text=True,
capture_output=True,
)
except subprocess.CalledProcessError as exc:
print(f"[ERROR] Git command failed: {cmd}")
print(f" Exit code: {exc.returncode}")
if exc.stdout:
print("\n" + exc.stdout)
if exc.stderr:
print("\n" + exc.stderr)
raise GitError(f"Git command failed: {cmd}") from exc
def _capture(cmd: str) -> str:
res = subprocess.run(cmd, shell=True, check=False, capture_output=True, text=True)
return (res.stdout or "").strip()
def ensure_clean_and_synced(preview: bool = False) -> None:
def ensure_clean_and_synced(*, preview: bool = False) -> None:
"""
Always run a pull BEFORE modifying anything.
Uses --ff-only to avoid creating merge commits automatically.
If no upstream is configured, we skip.
"""
upstream = _capture("git rev-parse --abbrev-ref --symbolic-full-name @{u} 2>/dev/null")
upstream = get_upstream_ref()
if not upstream:
print("[INFO] No upstream configured for current branch. Skipping pull.")
return
if preview:
print("[PREVIEW] Would run: git fetch origin --prune --tags --force")
print("[PREVIEW] Would run: git pull --ff-only")
return
print("[INFO] Syncing with remote before making any changes...")
run_git_command("git fetch origin --prune --tags --force")
run_git_command("git pull --ff-only")
# Mirrors old behavior:
# git fetch origin --prune --tags --force
# git pull --ff-only
fetch(remote="origin", prune=True, tags=True, force=True, preview=preview)
pull_ff_only(preview=preview)
def _parse_v_tag(tag: str) -> tuple[int, ...] | None:
"""
Parse tags like 'v1.2.3' into (1, 2, 3).
Returns None if parsing is not possible.
"""
if not tag.startswith("v"):
return None
raw = tag[1:]
if not raw:
return None
parts = raw.split(".")
out: list[int] = []
for p in parts:
if not p.isdigit():
return None
out.append(int(p))
return tuple(out) if out else None
def is_highest_version_tag(tag: str) -> bool:
"""
Return True if `tag` is the highest version among all tags matching v*.
Comparison uses `sort -V` for natural version ordering.
We avoid shelling out to `sort -V` and implement a small vX.Y.Z parser.
Non-parseable v* tags are ignored for version comparison.
"""
all_v = _capture("git tag --list 'v*'")
all_v = list_tags("v*")
if not all_v:
return True # No tags yet, so the current tag is the highest
return True # No tags yet -> current is highest by definition
# Get the latest tag in natural version order
latest = _capture("git tag --list 'v*' | sort -V | tail -n1")
print(f"[INFO] Latest tag: {latest}, Current tag: {tag}")
# Ensure that the current tag is always considered the highest if it's the latest one
return tag >= latest # Use comparison operator to consider all future tags
parsed_current = _parse_v_tag(tag)
if parsed_current is None:
# If the "current" tag isn't parseable, fall back to conservative behavior:
# treat it as highest only if it matches the max lexicographically.
latest_lex = max(all_v)
print(f"[INFO] Latest tag (lex): {latest_lex}, Current tag: {tag}")
return tag >= latest_lex
parsed_all: list[tuple[int, ...]] = []
for t in all_v:
parsed = _parse_v_tag(t)
if parsed is not None:
parsed_all.append(parsed)
if not parsed_all:
# No parseable tags -> nothing to compare against
return True
latest = max(parsed_all)
print(f"[INFO] Latest tag (parsed): v{'.'.join(map(str, latest))}, Current tag: {tag}")
return parsed_current >= latest
def update_latest_tag(new_tag: str, preview: bool = False) -> None:
def update_latest_tag(new_tag: str, *, preview: bool = False) -> None:
"""
Move the floating 'latest' tag to the newly created release tag.
@@ -78,15 +95,10 @@ def update_latest_tag(new_tag: str, preview: bool = False) -> None:
target_ref = f"{new_tag}^{{}}"
print(f"[INFO] Updating 'latest' tag to point at {new_tag} (commit {target_ref})...")
if preview:
print(
f'[PREVIEW] Would run: git tag -f -a latest {target_ref} '
f'-m "Floating latest tag for {new_tag}"'
)
print("[PREVIEW] Would run: git push origin latest --force")
return
run_git_command(
f'git tag -f -a latest {target_ref} -m "Floating latest tag for {new_tag}"'
tag_force_annotated(
name="latest",
target=target_ref,
message=f"Floating latest tag for {new_tag}",
preview=preview,
)
run_git_command("git push origin latest --force")
push("origin", "latest", force=True, preview=preview)

View File

@@ -6,6 +6,7 @@ from typing import Optional
from pkgmgr.actions.branch import close_branch
from pkgmgr.core.git import GitError
from pkgmgr.core.git.commands import add, commit, push, tag_annotated
from pkgmgr.core.git.queries import get_current_branch
from pkgmgr.core.repository.paths import resolve_repo_paths
@@ -21,7 +22,6 @@ from .files import (
from .git_ops import (
ensure_clean_and_synced,
is_highest_version_tag,
run_git_command,
update_latest_tag,
)
from .prompts import confirm_proceed_release, should_delete_branch
@@ -126,12 +126,11 @@ def _release_impl(
existing_files = [p for p in files_to_add if isinstance(p, str) and p and os.path.exists(p)]
if preview:
for path in existing_files:
print(f"[PREVIEW] Would run: git add {path}")
print(f'[PREVIEW] Would run: git commit -am "{commit_msg}"')
print(f'[PREVIEW] Would run: git tag -a {new_tag} -m "{tag_msg}"')
print(f"[PREVIEW] Would run: git push origin {branch}")
print(f"[PREVIEW] Would run: git push origin {new_tag}")
add(existing_files, preview=True)
commit(commit_msg, all=True, preview=True)
tag_annotated(new_tag, tag_msg, preview=True)
push("origin", branch, preview=True)
push("origin", new_tag, preview=True)
if is_highest_version_tag(new_tag):
update_latest_tag(new_tag, preview=True)
@@ -145,15 +144,13 @@ def _release_impl(
print(f"[PREVIEW] Would ask whether to delete branch {branch} after release.")
return
for path in existing_files:
run_git_command(f"git add {path}")
run_git_command(f'git commit -am "{commit_msg}"')
run_git_command(f'git tag -a {new_tag} -m "{tag_msg}"')
add(existing_files, preview=False)
commit(commit_msg, all=True, preview=False)
tag_annotated(new_tag, tag_msg, preview=False)
# Push branch and ONLY the newly created version tag (no --tags)
run_git_command(f"git push origin {branch}")
run_git_command(f"git push origin {new_tag}")
push("origin", branch, preview=False)
push("origin", new_tag, preview=False)
# Update 'latest' only if this is the highest version tag
try:

View File

@@ -1,25 +1,33 @@
from __future__ import annotations
from .add import GitAddError, add
from .checkout import GitCheckoutError, checkout
from .commit import GitCommitError, commit
from .create_branch import GitCreateBranchError, create_branch
from .delete_local_branch import GitDeleteLocalBranchError, delete_local_branch
from .delete_remote_branch import GitDeleteRemoteBranchError, delete_remote_branch
from .fetch import GitFetchError, fetch
from .merge_no_ff import GitMergeError, merge_no_ff
from .pull import GitPullError, pull
from .pull_ff_only import GitPullFfOnlyError, pull_ff_only
from .push import GitPushError, push
from .create_branch import GitCreateBranchError, create_branch
from .push_upstream import GitPushUpstreamError, push_upstream
from .add_remote import GitAddRemoteError, add_remote
from .set_remote_url import GitSetRemoteUrlError, set_remote_url
from .add_remote_push_url import GitAddRemotePushUrlError, add_remote_push_url
from .set_remote_url import GitSetRemoteUrlError, set_remote_url
from .tag_annotated import GitTagAnnotatedError, tag_annotated
from .tag_force_annotated import GitTagForceAnnotatedError, tag_force_annotated
__all__ = [
"add",
"fetch",
"checkout",
"pull",
"pull_ff_only",
"merge_no_ff",
"push",
"commit",
"delete_local_branch",
"delete_remote_branch",
"create_branch",
@@ -27,11 +35,16 @@ __all__ = [
"add_remote",
"set_remote_url",
"add_remote_push_url",
"tag_annotated",
"tag_force_annotated",
"GitAddError",
"GitFetchError",
"GitCheckoutError",
"GitPullError",
"GitPullFfOnlyError",
"GitMergeError",
"GitPushError",
"GitCommitError",
"GitDeleteLocalBranchError",
"GitDeleteRemoteBranchError",
"GitCreateBranchError",
@@ -39,4 +52,6 @@ __all__ = [
"GitAddRemoteError",
"GitSetRemoteUrlError",
"GitAddRemotePushUrlError",
"GitTagAnnotatedError",
"GitTagForceAnnotatedError",
]

View File

@@ -0,0 +1,44 @@
from __future__ import annotations
from typing import Iterable, List, Sequence, Union
from ..errors import GitError, GitCommandError
from ..run import run
class GitAddError(GitCommandError):
"""Raised when `git add` fails."""
PathLike = Union[str, Sequence[str], Iterable[str]]
def _normalize_paths(paths: PathLike) -> List[str]:
if isinstance(paths, str):
return [paths]
return [p for p in paths]
def add(
paths: PathLike,
*,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Stage one or multiple paths.
Equivalent to:
git add <path...>
"""
normalized = _normalize_paths(paths)
if not normalized:
return
try:
run(["add", *normalized], cwd=cwd, preview=preview)
except GitError as exc:
raise GitAddError(
f"Failed to add paths to staging area: {normalized!r}.",
cwd=cwd,
) from exc

View File

@@ -0,0 +1,37 @@
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitCommitError(GitCommandError):
"""Raised when `git commit` fails."""
def commit(
message: str,
*,
cwd: str = ".",
all: bool = False,
preview: bool = False,
) -> None:
"""
Create a commit.
Equivalent to:
git commit -m "<message>"
or (if all=True):
git commit -am "<message>"
"""
args = ["commit"]
if all:
args.append("-a")
args += ["-m", message]
try:
run(args, cwd=cwd, preview=preview)
except GitError as exc:
raise GitCommitError(
"Failed to create commit.",
cwd=cwd,
) from exc

View File

@@ -8,9 +8,31 @@ class GitFetchError(GitCommandError):
"""Raised when fetching from a remote fails."""
def fetch(remote: str = "origin", cwd: str = ".") -> None:
def fetch(
remote: str = "origin",
*,
prune: bool = False,
tags: bool = False,
force: bool = False,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Fetch from a remote, optionally with prune/tags/force.
Equivalent to:
git fetch <remote> [--prune] [--tags] [--force]
"""
args = ["fetch", remote]
if prune:
args.append("--prune")
if tags:
args.append("--tags")
if force:
args.append("--force")
try:
run(["fetch", remote], cwd=cwd)
run(args, cwd=cwd, preview=preview)
except GitError as exc:
raise GitFetchError(
f"Failed to fetch from remote {remote!r}.",

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitPullFfOnlyError(GitCommandError):
"""Raised when pulling with --ff-only fails."""
def pull_ff_only(*, cwd: str = ".", preview: bool = False) -> None:
"""
Pull using fast-forward only.
Equivalent to:
git pull --ff-only
"""
try:
run(["pull", "--ff-only"], cwd=cwd, preview=preview)
except GitError as exc:
raise GitPullFfOnlyError(
"Failed to pull with --ff-only.",
cwd=cwd,
) from exc

View File

@@ -8,9 +8,26 @@ class GitPushError(GitCommandError):
"""Raised when pushing to a remote fails."""
def push(remote: str, ref: str, cwd: str = ".") -> None:
def push(
remote: str,
ref: str,
*,
force: bool = False,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Push a ref to a remote, optionally forced.
Equivalent to:
git push <remote> <ref> [--force]
"""
args = ["push", remote, ref]
if force:
args.append("--force")
try:
run(["push", remote, ref], cwd=cwd)
run(args, cwd=cwd, preview=preview)
except GitError as exc:
raise GitPushError(
f"Failed to push ref {ref!r} to remote {remote!r}.",

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitTagAnnotatedError(GitCommandError):
"""Raised when creating an annotated tag fails."""
def tag_annotated(
tag: str,
message: str,
*,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Create an annotated tag.
Equivalent to:
git tag -a <tag> -m "<message>"
"""
try:
run(["tag", "-a", tag, "-m", message], cwd=cwd, preview=preview)
except GitError as exc:
raise GitTagAnnotatedError(
f"Failed to create annotated tag {tag!r}.",
cwd=cwd,
) from exc

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitTagForceAnnotatedError(GitCommandError):
"""Raised when forcing an annotated tag fails."""
def tag_force_annotated(
name: str,
target: str,
message: str,
*,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Force-create an annotated tag pointing at a given target.
Equivalent to:
git tag -f -a <name> <target> -m "<message>"
"""
try:
run(["tag", "-f", "-a", name, target, "-m", message], cwd=cwd, preview=preview)
except GitError as exc:
raise GitTagForceAnnotatedError(
f"Failed to force annotated tag {name!r} at {target!r}.",
cwd=cwd,
) from exc

View File

@@ -11,6 +11,8 @@ from .probe_remote_reachable import probe_remote_reachable
from .get_changelog import get_changelog, GitChangelogQueryError
from .get_tags_at_ref import get_tags_at_ref, GitTagsAtRefQueryError
from .get_config_value import get_config_value
from .get_upstream_ref import get_upstream_ref
from .list_tags import list_tags
__all__ = [
"get_current_branch",
@@ -27,4 +29,6 @@ __all__ = [
"get_tags_at_ref",
"GitTagsAtRefQueryError",
"get_config_value",
"get_upstream_ref",
"list_tags",
]

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from typing import Optional
from ..errors import GitError
from ..run import run
def get_upstream_ref(*, cwd: str = ".") -> Optional[str]:
"""
Return the configured upstream ref for the current branch, or None if none.
Equivalent to:
git rev-parse --abbrev-ref --symbolic-full-name @{u}
"""
try:
out = run(
["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
cwd=cwd,
)
except GitError:
return None
out = out.strip()
return out or None

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from typing import List
from ..run import run
def list_tags(pattern: str = "*", *, cwd: str = ".") -> List[str]:
"""
List tags matching a pattern.
Equivalent to:
git tag --list <pattern>
"""
out = run(["tag", "--list", pattern], cwd=cwd)
if not out:
return []
return [line.strip() for line in out.splitlines() if line.strip()]