Compare commits

..

3 Commits

Author SHA1 Message Date
Kevin Veen-Birkenbach
e135d39710 Release version 0.4.2 2025-12-09 00:03:46 +01:00
Kevin Veen-Birkenbach
76b7f84989 Release version 0.4.1 2025-12-08 23:20:28 +01:00
Kevin Veen-Birkenbach
1b53263f87 Release version 0.4.0 2025-12-08 23:02:43 +01:00
31 changed files with 1004 additions and 1758 deletions

View File

@@ -1,14 +1,16 @@
## [0.3.0] - 2025-12-08 ## [0.4.2] - 2025-12-09
* Massive refactor and feature expansion: * Wire pkgmgr release CLI to new helper and add unit tests (see ChatGPT conversation: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62)
- Complete rewrite of config loading system (layered defaults + user config)
- New selection engine (--string, --category, --tag)
- Overhauled list output (colored statuses, alias highlight) ## [0.4.1] - 2025-12-08
- New config update logic + default YAML sync
- Improved proxy command handling * Add branch close subcommand and integrate release close/editor flow (ChatGPT: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62)
- Full CLI routing refactor
- Expanded E2E tests for list, proxy, and selection logic
Konversation: https://chatgpt.com/share/693745c3-b8d8-800f-aa29-c8481a2ffae1 ## [0.4.0] - 2025-12-08
* Add branch closing helper and --close flag to release command, including CLI wiring and tests (see https://chatgpt.com/share/69374aec-74ec-800f-bde3-5d91dfdb9b91)
## [0.2.0] - 2025-12-08 ## [0.2.0] - 2025-12-08

View File

@@ -1,7 +1,7 @@
# Maintainer: Kevin Veen-Birkenbach <info@veen.world> # Maintainer: Kevin Veen-Birkenbach <info@veen.world>
pkgname=package-manager pkgname=package-manager
pkgver=0.3.0 pkgver=0.4.2
pkgrel=1 pkgrel=1
pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)." pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)."
arch=('any') arch=('any')

View File

@@ -1,7 +0,0 @@
- account: kevinveenbirkenbach
alias: gkfdrtdtcntr
provider: github.com
repository: federated-to-central-social-network-bridge
verified:
gpg_keys:
- 44D8F11FD62F878E

26
debian/changelog vendored
View File

@@ -1,16 +1,20 @@
package-manager (0.3.0-1) unstable; urgency=medium package-manager (0.4.2-1) unstable; urgency=medium
* Massive refactor and feature expansion: * Wire pkgmgr release CLI to new helper and add unit tests (see ChatGPT conversation: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62)
- Complete rewrite of config loading system (layered defaults + user config)
- New selection engine (--string, --category, --tag)
- Overhauled list output (colored statuses, alias highlight)
- New config update logic + default YAML sync
- Improved proxy command handling
- Full CLI routing refactor
- Expanded E2E tests for list, proxy, and selection logic
Konversation: https://chatgpt.com/share/693745c3-b8d8-800f-aa29-c8481a2ffae1
-- Kevin Veen-Birkenbach <kevin@veen.world> Mon, 08 Dec 2025 22:40:49 +0100 -- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 00:03:46 +0100
package-manager (0.4.1-1) unstable; urgency=medium
* Add branch close subcommand and integrate release close/editor flow (ChatGPT: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62)
-- Kevin Veen-Birkenbach <kevin@veen.world> Mon, 08 Dec 2025 23:20:28 +0100
package-manager (0.4.0-1) unstable; urgency=medium
* Add branch closing helper and --close flag to release command, including CLI wiring and tests (see https://chatgpt.com/share/69374aec-74ec-800f-bde3-5d91dfdb9b91)
-- Kevin Veen-Birkenbach <kevin@veen.world> Mon, 08 Dec 2025 23:02:43 +0100
package-manager (0.2.0-1) unstable; urgency=medium package-manager (0.2.0-1) unstable; urgency=medium

View File

@@ -31,7 +31,7 @@
rec { rec {
pkgmgr = pyPkgs.buildPythonApplication { pkgmgr = pyPkgs.buildPythonApplication {
pname = "package-manager"; pname = "package-manager";
version = "0.3.0"; version = "0.4.2";
# Use the git repo as source # Use the git repo as source
src = ./.; src = ./.;

View File

@@ -1,5 +1,5 @@
Name: package-manager Name: package-manager
Version: 0.3.0 Version: 0.4.2
Release: 1%{?dist} Release: 1%{?dist}
Summary: Wrapper that runs Kevin's package-manager via Nix flake Summary: Wrapper that runs Kevin's package-manager via Nix flake

View File

@@ -1,3 +1,4 @@
# pkgmgr/branch_commands.py
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
@@ -12,7 +13,7 @@ from __future__ import annotations
from typing import Optional from typing import Optional
from pkgmgr.git_utils import run_git, GitError from pkgmgr.git_utils import run_git, GitError, get_current_branch
def open_branch( def open_branch(
@@ -78,3 +79,136 @@ def open_branch(
raise RuntimeError( raise RuntimeError(
f"Failed to push new branch {name!r} to origin: {exc}" f"Failed to push new branch {name!r} to origin: {exc}"
) from exc ) from exc
def _resolve_base_branch(
preferred: str,
fallback: str,
cwd: str,
) -> str:
"""
Resolve the base branch to use for merging.
Try `preferred` (default: main) first, then `fallback` (default: master).
Raise RuntimeError if neither exists.
"""
for candidate in (preferred, fallback):
try:
run_git(["rev-parse", "--verify", candidate], cwd=cwd)
return candidate
except GitError:
continue
raise RuntimeError(
f"Neither {preferred!r} nor {fallback!r} exist in this repository."
)
def close_branch(
name: Optional[str],
base_branch: str = "main",
fallback_base: str = "master",
cwd: str = ".",
) -> None:
"""
Merge a feature branch into the main/master branch and optionally delete it.
Steps:
1) Determine branch name (argument or current branch)
2) Resolve base branch (prefers `base_branch`, falls back to `fallback_base`)
3) Ask for confirmation (y/N)
4) git fetch origin
5) git checkout <base>
6) git pull origin <base>
7) git merge --no-ff <name>
8) git push origin <base>
9) Delete branch locally and on origin
If the user does not confirm with 'y', the operation is aborted.
"""
# 1) Determine which branch to close
if not name:
try:
name = get_current_branch(cwd=cwd)
except GitError as exc:
raise RuntimeError(f"Failed to detect current branch: {exc}") from exc
if not name:
raise RuntimeError("Branch name must not be empty.")
# 2) Resolve base branch (main/master)
target_base = _resolve_base_branch(base_branch, fallback_base, cwd=cwd)
if name == target_base:
raise RuntimeError(
f"Refusing to close base branch {target_base!r}. "
"Please specify a feature branch."
)
# 3) Confirmation prompt
prompt = (
f"Merge branch '{name}' into '{target_base}' and delete it afterwards? "
"(y/N): "
)
answer = input(prompt).strip().lower()
if answer != "y":
print("Aborted closing branch.")
return
# 4) Fetch from origin
try:
run_git(["fetch", "origin"], cwd=cwd)
except GitError as exc:
raise RuntimeError(
f"Failed to fetch from origin before closing branch {name!r}: {exc}"
) from exc
# 5) Checkout base branch
try:
run_git(["checkout", target_base], cwd=cwd)
except GitError as exc:
raise RuntimeError(
f"Failed to checkout base branch {target_base!r}: {exc}"
) from exc
# 6) Pull latest base
try:
run_git(["pull", "origin", target_base], cwd=cwd)
except GitError as exc:
raise RuntimeError(
f"Failed to pull latest changes for base branch {target_base!r}: {exc}"
) from exc
# 7) Merge feature branch into base
try:
run_git(["merge", "--no-ff", name], cwd=cwd)
except GitError as exc:
raise RuntimeError(
f"Failed to merge branch {name!r} into {target_base!r}: {exc}"
) from exc
# 8) Push updated base
try:
run_git(["push", "origin", target_base], cwd=cwd)
except GitError as exc:
raise RuntimeError(
f"Failed to push base branch {target_base!r} to origin after merge: {exc}"
) from exc
# 9) Delete feature branch locally
try:
run_git(["branch", "-d", name], cwd=cwd)
except GitError as exc:
raise RuntimeError(
f"Failed to delete local branch {name!r} after merge: {exc}"
) from exc
# 10) Delete feature branch on origin (best effort)
try:
run_git(["push", "origin", "--delete", name], cwd=cwd)
except GitError as exc:
# Remote delete is nice-to-have; surface as RuntimeError for clarity.
raise RuntimeError(
f"Branch {name!r} was deleted locally, but remote deletion failed: {exc}"
) from exc

View File

@@ -9,9 +9,9 @@ import sys
from pkgmgr.load_config import load_config from pkgmgr.load_config import load_config
from pkgmgr.cli_core import CLIContext, create_parser, dispatch_command from pkgmgr.cli_core import CLIContext, create_parser, dispatch_command
# User config lives in the home directory: # Define configuration file paths.
# ~/.config/pkgmgr/config.yaml PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
USER_CONFIG_PATH = os.path.expanduser("~/.config/pkgmgr/config.yaml") USER_CONFIG_PATH = os.path.join(PROJECT_ROOT, "config", "config.yaml")
DESCRIPTION_TEXT = """\ DESCRIPTION_TEXT = """\
\033[1;32mPackage Manager 🤖📦\033[0m \033[1;32mPackage Manager 🤖📦\033[0m
@@ -63,31 +63,20 @@ For detailed help on each command, use:
def main() -> None: def main() -> None:
""" # Load merged configuration
Entry point for the pkgmgr CLI.
"""
config_merged = load_config(USER_CONFIG_PATH) config_merged = load_config(USER_CONFIG_PATH)
# Directories: be robust and provide sane defaults if missing repositories_base_dir = os.path.expanduser(
directories = config_merged.get("directories") or {} config_merged["directories"]["repositories"]
repositories_dir = os.path.expanduser(
directories.get("repositories", "~/Repositories")
) )
binaries_dir = os.path.expanduser( binaries_dir = os.path.expanduser(
directories.get("binaries", "~/.local/bin") config_merged["directories"]["binaries"]
) )
all_repositories = config_merged["repositories"]
# Ensure the merged config actually contains the resolved directories
config_merged.setdefault("directories", {})
config_merged["directories"]["repositories"] = repositories_dir
config_merged["directories"]["binaries"] = binaries_dir
all_repositories = config_merged.get("repositories", [])
ctx = CLIContext( ctx = CLIContext(
config_merged=config_merged, config_merged=config_merged,
repositories_base_dir=repositories_dir, repositories_base_dir=repositories_base_dir,
all_repositories=all_repositories, all_repositories=all_repositories,
binaries_dir=binaries_dir, binaries_dir=binaries_dir,
user_config_path=USER_CONFIG_PATH, user_config_path=USER_CONFIG_PATH,
@@ -96,6 +85,7 @@ def main() -> None:
parser = create_parser(DESCRIPTION_TEXT) parser = create_parser(DESCRIPTION_TEXT)
args = parser.parse_args() args = parser.parse_args()
# If no subcommand is provided, show help
if not getattr(args, "command", None): if not getattr(args, "command", None):
parser.print_help() parser.print_help()
return return

View File

@@ -1,9 +1,10 @@
# pkgmgr/cli_core/commands/branch.py
from __future__ import annotations from __future__ import annotations
import sys import sys
from pkgmgr.cli_core.context import CLIContext from pkgmgr.cli_core.context import CLIContext
from pkgmgr.branch_commands import open_branch from pkgmgr.branch_commands import open_branch, close_branch
def handle_branch(args, ctx: CLIContext) -> None: def handle_branch(args, ctx: CLIContext) -> None:
@@ -11,7 +12,8 @@ def handle_branch(args, ctx: CLIContext) -> None:
Handle `pkgmgr branch` subcommands. Handle `pkgmgr branch` subcommands.
Currently supported: Currently supported:
- pkgmgr branch open [<name>] [--base <branch>] - pkgmgr branch open [<name>] [--base <branch>]
- pkgmgr branch close [<name>] [--base <branch>]
""" """
if args.subcommand == "open": if args.subcommand == "open":
open_branch( open_branch(
@@ -21,5 +23,13 @@ def handle_branch(args, ctx: CLIContext) -> None:
) )
return return
if args.subcommand == "close":
close_branch(
name=getattr(args, "name", None),
base_branch=getattr(args, "base", "main"),
cwd=".",
)
return
print(f"Unknown branch subcommand: {args.subcommand}") print(f"Unknown branch subcommand: {args.subcommand}")
sys.exit(2) sys.exit(2)

View File

@@ -1,13 +1,8 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations from __future__ import annotations
import os import os
import sys import sys
import shutil from typing import Any, Dict, List
from pathlib import Path
from typing import Any, Dict
import yaml import yaml
@@ -22,103 +17,29 @@ from pkgmgr.run_command import run_command
def _load_user_config(user_config_path: str) -> Dict[str, Any]: def _load_user_config(user_config_path: str) -> Dict[str, Any]:
""" """
Load the user config from ~/.config/pkgmgr/config.yaml Load the user config file, returning a default structure if it does not exist.
(or whatever ctx.user_config_path is), creating the directory if needed.
""" """
user_config_path_expanded = os.path.expanduser(user_config_path) if os.path.exists(user_config_path):
cfg_dir = os.path.dirname(user_config_path_expanded) with open(user_config_path, "r") as f:
if cfg_dir and not os.path.isdir(cfg_dir):
os.makedirs(cfg_dir, exist_ok=True)
if os.path.exists(user_config_path_expanded):
with open(user_config_path_expanded, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {"repositories": []} return yaml.safe_load(f) or {"repositories": []}
return {"repositories": []} return {"repositories": []}
def _find_defaults_source_dir() -> str | None:
"""
Find the directory inside the installed pkgmgr package OR the
project root that contains default config files.
Preferred locations (in dieser Reihenfolge):
- <pkg_root>/config_defaults
- <pkg_root>/config
- <project_root>/config_defaults
- <project_root>/config
"""
import pkgmgr # local import to avoid circular deps
pkg_root = Path(pkgmgr.__file__).resolve().parent
project_root = pkg_root.parent
candidates = [
pkg_root / "config_defaults",
pkg_root / "config",
project_root / "config_defaults",
project_root / "config",
]
for cand in candidates:
if cand.is_dir():
return str(cand)
return None
def _update_default_configs(user_config_path: str) -> None:
"""
Copy all default *.yml/*.yaml files from the installed pkgmgr package
into ~/.config/pkgmgr/, overwriting existing ones except the user
config file itself (config.yaml), which is never touched.
"""
source_dir = _find_defaults_source_dir()
if not source_dir:
print(
"[WARN] No config_defaults or config directory found in "
"pkgmgr installation. Nothing to update."
)
return
dest_dir = os.path.dirname(os.path.expanduser(user_config_path))
if not dest_dir:
dest_dir = os.path.expanduser("~/.config/pkgmgr")
os.makedirs(dest_dir, exist_ok=True)
for name in os.listdir(source_dir):
lower = name.lower()
if not (lower.endswith(".yml") or lower.endswith(".yaml")):
continue
if name == "config.yaml":
# Never overwrite the user config template / live config
continue
src = os.path.join(source_dir, name)
dst = os.path.join(dest_dir, name)
shutil.copy2(src, dst)
print(f"[INFO] Updated default config file: {dst}")
def handle_config(args, ctx: CLIContext) -> None: def handle_config(args, ctx: CLIContext) -> None:
""" """
Handle 'pkgmgr config' subcommands. Handle the 'config' command and its subcommands.
""" """
user_config_path = ctx.user_config_path user_config_path = ctx.user_config_path
# ------------------------------------------------------------ # --------------------------------------------------------
# config show # config show
# ------------------------------------------------------------ # --------------------------------------------------------
if args.subcommand == "show": if args.subcommand == "show":
if args.all or (not args.identifiers): if args.all or (not args.identifiers):
# Full merged config view
show_config([], user_config_path, full_config=True) show_config([], user_config_path, full_config=True)
else: else:
# Show only matching entries from user config selected = resolve_repos(args.identifiers, ctx.all_repositories)
user_config = _load_user_config(user_config_path)
selected = resolve_repos(
args.identifiers,
user_config.get("repositories", []),
)
if selected: if selected:
show_config( show_config(
selected, selected,
@@ -127,23 +48,23 @@ def handle_config(args, ctx: CLIContext) -> None:
) )
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# config add # config add
# ------------------------------------------------------------ # --------------------------------------------------------
if args.subcommand == "add": if args.subcommand == "add":
interactive_add(ctx.config_merged, user_config_path) interactive_add(ctx.config_merged, user_config_path)
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# config edit # config edit
# ------------------------------------------------------------ # --------------------------------------------------------
if args.subcommand == "edit": if args.subcommand == "edit":
run_command(f"nano {user_config_path}") run_command(f"nano {user_config_path}")
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# config init # config init
# ------------------------------------------------------------ # --------------------------------------------------------
if args.subcommand == "init": if args.subcommand == "init":
user_config = _load_user_config(user_config_path) user_config = _load_user_config(user_config_path)
config_init( config_init(
@@ -154,17 +75,14 @@ def handle_config(args, ctx: CLIContext) -> None:
) )
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# config delete # config delete
# ------------------------------------------------------------ # --------------------------------------------------------
if args.subcommand == "delete": if args.subcommand == "delete":
user_config = _load_user_config(user_config_path) user_config = _load_user_config(user_config_path)
if args.all or not args.identifiers: if args.all or not args.identifiers:
print( print("You must specify identifiers to delete.")
"[ERROR] 'config delete' requires explicit identifiers. "
"Use 'config show' to inspect entries."
)
return return
to_delete = resolve_repos( to_delete = resolve_repos(
@@ -181,17 +99,14 @@ def handle_config(args, ctx: CLIContext) -> None:
print(f"Deleted {len(to_delete)} entries from user config.") print(f"Deleted {len(to_delete)} entries from user config.")
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# config ignore # config ignore
# ------------------------------------------------------------ # --------------------------------------------------------
if args.subcommand == "ignore": if args.subcommand == "ignore":
user_config = _load_user_config(user_config_path) user_config = _load_user_config(user_config_path)
if args.all or not args.identifiers: if args.all or not args.identifiers:
print( print("You must specify identifiers to modify ignore flag.")
"[ERROR] 'config ignore' requires explicit identifiers. "
"Use 'config show' to inspect entries."
)
return return
to_modify = resolve_repos( to_modify = resolve_repos(
@@ -220,21 +135,6 @@ def handle_config(args, ctx: CLIContext) -> None:
save_user_config(user_config, user_config_path) save_user_config(user_config, user_config_path)
return return
# ------------------------------------------------------------ # If we end up here, something is wrong with subcommand routing
# config update
# ------------------------------------------------------------
if args.subcommand == "update":
"""
Copy default YAML configs from the installed package into the
user's ~/.config/pkgmgr directory.
This will overwrite files with the same name (except config.yaml).
"""
_update_default_configs(user_config_path)
return
# ------------------------------------------------------------
# Unknown subcommand
# ------------------------------------------------------------
print(f"Unknown config subcommand: {args.subcommand}") print(f"Unknown config subcommand: {args.subcommand}")
sys.exit(2) sys.exit(2)

View File

@@ -1,12 +1,31 @@
# pkgmgr/cli_core/commands/release.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Release command wiring for the pkgmgr CLI.
This module implements the `pkgmgr release` subcommand on top of the
generic selection logic from cli_core.dispatch. It does not define its
own subparser; the CLI surface is configured in cli_core.parser.
Responsibilities:
- Take the parsed argparse.Namespace for the `release` command.
- Use the list of selected repositories provided by dispatch_command().
- Optionally list affected repositories when --list is set.
- For each selected repository, run pkgmgr.release.release(...) in
the context of that repository directory.
"""
from __future__ import annotations from __future__ import annotations
import os import os
import sys from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from pkgmgr.cli_core.context import CLIContext from pkgmgr.cli_core.context import CLIContext
from pkgmgr.get_repo_dir import get_repo_dir from pkgmgr.get_repo_dir import get_repo_dir
from pkgmgr import release as rel from pkgmgr.get_repo_identifier import get_repo_identifier
from pkgmgr.release import release as run_release
Repository = Dict[str, Any] Repository = Dict[str, Any]
@@ -18,59 +37,63 @@ def handle_release(
selected: List[Repository], selected: List[Repository],
) -> None: ) -> None:
""" """
Handle the 'release' command. Handle the `pkgmgr release` subcommand.
Creates a release by incrementing the version and updating the changelog Flow:
in a single selected repository. 1) Use the `selected` repositories as computed by dispatch_command().
2) If --list is given, print the identifiers of the selected repos
Important: and return without running any release.
- Releases are strictly limited to exactly ONE repository. 3) For each selected repository:
- Using --all or specifying multiple identifiers for release does - Resolve its identifier and local directory.
not make sense and is therefore rejected. - Change into that directory.
- The --preview flag is respected and passed through to the release - Call pkgmgr.release.release(...) with the parsed options.
implementation so that no changes are made in preview mode.
""" """
if not selected: if not selected:
print("No repositories selected for release.") print("[pkgmgr] No repositories selected for release.")
sys.exit(1) return
# List-only mode: show which repositories would be affected.
if getattr(args, "list", False):
print("[pkgmgr] Repositories that would be affected by this release:")
for repo in selected:
identifier = get_repo_identifier(repo, ctx.all_repositories)
print(f" - {identifier}")
return
for repo in selected:
identifier = get_repo_identifier(repo, ctx.all_repositories)
repo_dir = repo.get("directory")
if not repo_dir:
try:
repo_dir = get_repo_dir(ctx.repositories_base_dir, repo)
except Exception:
repo_dir = None
if not repo_dir or not os.path.isdir(repo_dir):
print(
f"[WARN] Skipping repository {identifier}: "
"local directory does not exist."
)
continue
if len(selected) > 1:
print( print(
"[ERROR] Release operations are limited to a single repository.\n" f"[pkgmgr] Running release for repository {identifier} "
"Do not use --all or multiple identifiers with 'pkgmgr release'." f"in '{repo_dir}'..."
) )
sys.exit(1)
original_dir = os.getcwd() # Change to repo directory and invoke the helper.
cwd_before = os.getcwd()
repo = selected[0] try:
os.chdir(repo_dir)
repo_dir: Optional[str] = repo.get("directory") run_release(
if not repo_dir: pyproject_path="pyproject.toml",
repo_dir = get_repo_dir(ctx.repositories_base_dir, repo) changelog_path="CHANGELOG.md",
release_type=args.release_type,
if not os.path.isdir(repo_dir): message=args.message or None,
print( preview=getattr(args, "preview", False),
f"[ERROR] Repository directory does not exist locally: {repo_dir}" force=getattr(args, "force", False),
) close=getattr(args, "close", False),
sys.exit(1) )
finally:
pyproject_path = os.path.join(repo_dir, "pyproject.toml") os.chdir(cwd_before)
changelog_path = os.path.join(repo_dir, "CHANGELOG.md")
print(
f"Releasing repository '{repo.get('repository')}' in '{repo_dir}'..."
)
os.chdir(repo_dir)
try:
rel.release(
pyproject_path=pyproject_path,
changelog_path=changelog_path,
release_type=args.release_type,
message=args.message,
preview=getattr(args, "preview", False),
)
finally:
os.chdir(original_dir)

View File

@@ -1,6 +1,3 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations from __future__ import annotations
import sys import sys
@@ -15,7 +12,7 @@ from pkgmgr.status_repos import status_repos
from pkgmgr.list_repositories import list_repositories from pkgmgr.list_repositories import list_repositories
from pkgmgr.run_command import run_command from pkgmgr.run_command import run_command
from pkgmgr.create_repo import create_repo from pkgmgr.create_repo import create_repo
from pkgmgr.get_selected_repos import get_selected_repos
Repository = Dict[str, Any] Repository = Dict[str, Any]
@@ -26,12 +23,15 @@ def handle_repos_command(
selected: List[Repository], selected: List[Repository],
) -> None: ) -> None:
""" """
Handle core repository commands (install/update/deinstall/delete/.../list). Handle repository-related commands:
- install / update / deinstall / delete / status
- path / shell
- create / list
""" """
# ------------------------------------------------------------ # --------------------------------------------------------
# install # install / update
# ------------------------------------------------------------ # --------------------------------------------------------
if args.command == "install": if args.command == "install":
install_repos( install_repos(
selected, selected,
@@ -46,9 +46,6 @@ def handle_repos_command(
) )
return return
# ------------------------------------------------------------
# update
# ------------------------------------------------------------
if args.command == "update": if args.command == "update":
update_repos( update_repos(
selected, selected,
@@ -64,9 +61,9 @@ def handle_repos_command(
) )
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# deinstall # deinstall / delete
# ------------------------------------------------------------ # --------------------------------------------------------
if args.command == "deinstall": if args.command == "deinstall":
deinstall_repos( deinstall_repos(
selected, selected,
@@ -77,9 +74,6 @@ def handle_repos_command(
) )
return return
# ------------------------------------------------------------
# delete
# ------------------------------------------------------------
if args.command == "delete": if args.command == "delete":
delete_repos( delete_repos(
selected, selected,
@@ -89,9 +83,9 @@ def handle_repos_command(
) )
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# status # status
# ------------------------------------------------------------ # --------------------------------------------------------
if args.command == "status": if args.command == "status":
status_repos( status_repos(
selected, selected,
@@ -104,20 +98,20 @@ def handle_repos_command(
) )
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# path # path
# ------------------------------------------------------------ # --------------------------------------------------------
if args.command == "path": if args.command == "path":
for repository in selected: for repository in selected:
print(repository["directory"]) print(repository["directory"])
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# shell # shell
# ------------------------------------------------------------ # --------------------------------------------------------
if args.command == "shell": if args.command == "shell":
if not args.shell_command: if not args.shell_command:
print("[ERROR] 'shell' requires a command via -c/--command.") print("No shell command specified.")
sys.exit(2) sys.exit(2)
command_to_run = " ".join(args.shell_command) command_to_run = " ".join(args.shell_command)
for repository in selected: for repository in selected:
@@ -131,13 +125,13 @@ def handle_repos_command(
) )
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# create # create
# ------------------------------------------------------------ # --------------------------------------------------------
if args.command == "create": if args.command == "create":
if not args.identifiers: if not args.identifiers:
print( print(
"[ERROR] 'create' requires at least one identifier " "No identifiers provided. Please specify at least one identifier "
"in the format provider/account/repository." "in the format provider/account/repository."
) )
sys.exit(1) sys.exit(1)
@@ -153,19 +147,15 @@ def handle_repos_command(
) )
return return
# ------------------------------------------------------------ # --------------------------------------------------------
# list # list
# ------------------------------------------------------------ # --------------------------------------------------------
if args.command == "list": if args.command == "list":
list_repositories( list_repositories(
selected, ctx.all_repositories,
ctx.repositories_base_dir, ctx.repositories_base_dir,
ctx.binaries_dir, ctx.binaries_dir,
status_filter=getattr(args, "status", "") or "", search_filter=args.search,
extra_tags=getattr(args, "tag", []) or [], status_filter=args.status,
show_description=getattr(args, "description", False),
) )
return return
print(f"[ERROR] Unknown repos command: {args.command}")
sys.exit(2)

View File

@@ -1,6 +1,3 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations from __future__ import annotations
import sys import sys
@@ -24,14 +21,18 @@ from pkgmgr.cli_core.commands import (
def dispatch_command(args, ctx: CLIContext) -> None: def dispatch_command(args, ctx: CLIContext) -> None:
""" """
Dispatch the parsed arguments to the appropriate command handler. Top-level command dispatcher.
Responsible for:
- computing selected repositories (where applicable)
- delegating to the correct command handler module
""" """
# First: proxy commands (git / docker / docker compose / make wrapper etc.) # 1) Proxy commands (git, docker, docker compose) short-circuit.
if maybe_handle_proxy(args, ctx): if maybe_handle_proxy(args, ctx):
return return
# Commands that operate on repository selections # 2) Determine if this command uses repository selection.
commands_with_selection: List[str] = [ commands_with_selection: List[str] = [
"install", "install",
"update", "update",
@@ -40,25 +41,26 @@ def dispatch_command(args, ctx: CLIContext) -> None:
"status", "status",
"path", "path",
"shell", "shell",
"create", "code",
"list",
"make",
"release",
"version",
"changelog",
"explore", "explore",
"terminal", "terminal",
"code", "release",
"version",
"make",
"changelog",
# intentionally NOT "branch" it operates on cwd only
] ]
if getattr(args, "command", None) in commands_with_selection: if args.command in commands_with_selection:
selected = get_selected_repos(args, ctx.all_repositories) selected = get_selected_repos(
getattr(args, "all", False),
ctx.all_repositories,
getattr(args, "identifiers", []),
)
else: else:
selected = [] selected = []
# ------------------------------------------------------------------ # # 3) Delegate based on command.
# Repos-related commands
# ------------------------------------------------------------------ #
if args.command in ( if args.command in (
"install", "install",
"update", "update",
@@ -71,41 +73,22 @@ def dispatch_command(args, ctx: CLIContext) -> None:
"list", "list",
): ):
handle_repos_command(args, ctx, selected) handle_repos_command(args, ctx, selected)
return elif args.command in ("code", "explore", "terminal"):
# ------------------------------------------------------------------ #
# Tools (explore / terminal / code)
# ------------------------------------------------------------------ #
if args.command in ("explore", "terminal", "code"):
handle_tools_command(args, ctx, selected) handle_tools_command(args, ctx, selected)
return elif args.command == "release":
# ------------------------------------------------------------------ #
# Release / Version / Changelog / Config / Make / Branch
# ------------------------------------------------------------------ #
if args.command == "release":
handle_release(args, ctx, selected) handle_release(args, ctx, selected)
return elif args.command == "version":
if args.command == "version":
handle_version(args, ctx, selected) handle_version(args, ctx, selected)
return elif args.command == "changelog":
if args.command == "changelog":
handle_changelog(args, ctx, selected) handle_changelog(args, ctx, selected)
return elif args.command == "config":
if args.command == "config":
handle_config(args, ctx) handle_config(args, ctx)
return elif args.command == "make":
if args.command == "make":
handle_make(args, ctx, selected) handle_make(args, ctx, selected)
return elif args.command == "branch":
# Branch commands currently operate on the current working
if args.command == "branch": # directory only, not on the pkgmgr repository selection.
handle_branch(args, ctx) handle_branch(args, ctx)
return else:
print(f"Unknown command: {args.command}")
print(f"Unknown command: {args.command}") sys.exit(2)
sys.exit(2)

View File

@@ -1,6 +1,3 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations from __future__ import annotations
import argparse import argparse
@@ -15,20 +12,13 @@ class SortedSubParsersAction(argparse._SubParsersAction):
def add_parser(self, name, **kwargs): def add_parser(self, name, **kwargs):
parser = super().add_parser(name, **kwargs) parser = super().add_parser(name, **kwargs)
# Sort choices alphabetically by dest (subcommand name)
self._choices_actions.sort(key=lambda a: a.dest) self._choices_actions.sort(key=lambda a: a.dest)
return parser return parser
def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None: def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
""" """
Common identifier / selection arguments for many subcommands. Attach generic repository selection arguments to a subparser.
Selection modes (mutual intent, not hard-enforced):
- identifiers (positional): select by alias / provider/account/repo
- --all: select all repositories
- --category / --string / --tag: filter-based selection on top
of the full repository set
""" """
subparser.add_argument( subparser.add_argument(
"identifiers", "identifiers",
@@ -49,33 +39,6 @@ def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
"yes | pkgmgr {subcommand} --all" "yes | pkgmgr {subcommand} --all"
), ),
) )
subparser.add_argument(
"--category",
nargs="+",
default=[],
help=(
"Filter repositories by category patterns derived from config "
"filenames or repo metadata (use filename without .yml/.yaml, "
"or /regex/ to use a regular expression)."
),
)
subparser.add_argument(
"--string",
default="",
help=(
"Filter repositories whose identifier / name / path contains this "
"substring (case-insensitive). Use /regex/ for regular expressions."
),
)
subparser.add_argument(
"--tag",
action="append",
default=[],
help=(
"Filter repositories by tag. Matches tags from the repository "
"collector and category tags. Use /regex/ for regular expressions."
),
)
subparser.add_argument( subparser.add_argument(
"--preview", "--preview",
action="store_true", action="store_true",
@@ -98,7 +61,7 @@ def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None: def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None:
""" """
Common arguments for install/update commands. Attach shared flags for install/update-like commands.
""" """
add_identifier_arguments(subparser) add_identifier_arguments(subparser)
subparser.add_argument( subparser.add_argument(
@@ -131,7 +94,10 @@ def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None:
def create_parser(description_text: str) -> argparse.ArgumentParser: def create_parser(description_text: str) -> argparse.ArgumentParser:
""" """
Create the top-level argument parser for pkgmgr. Create and configure the top-level argument parser for pkgmgr.
This function defines *only* the CLI surface (arguments & subcommands),
but no business logic.
""" """
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description=description_text, description=description_text,
@@ -144,7 +110,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
) )
# ------------------------------------------------------------ # ------------------------------------------------------------
# install / update / deinstall / delete # install / update
# ------------------------------------------------------------ # ------------------------------------------------------------
install_parser = subparsers.add_parser( install_parser = subparsers.add_parser(
"install", "install",
@@ -163,6 +129,9 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
help="Include system update commands", help="Include system update commands",
) )
# ------------------------------------------------------------
# deinstall / delete
# ------------------------------------------------------------
deinstall_parser = subparsers.add_parser( deinstall_parser = subparsers.add_parser(
"deinstall", "deinstall",
help="Remove alias links to repository/repositories", help="Remove alias links to repository/repositories",
@@ -178,7 +147,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
# ------------------------------------------------------------ # ------------------------------------------------------------
# create # create
# ------------------------------------------------------------ # ------------------------------------------------------------
create_cmd_parser = subparsers.add_parser( create_parser = subparsers.add_parser(
"create", "create",
help=( help=(
"Create new repository entries: add them to the config if not " "Create new repository entries: add them to the config if not "
@@ -186,8 +155,8 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"remotely if --remote is set." "remotely if --remote is set."
), ),
) )
add_identifier_arguments(create_cmd_parser) add_identifier_arguments(create_parser)
create_cmd_parser.add_argument( create_parser.add_argument(
"--remote", "--remote",
action="store_true", action="store_true",
help="If set, add the remote and push the initial commit.", help="If set, add the remote and push the initial commit.",
@@ -259,14 +228,6 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
help="Set ignore to true or false", help="Set ignore to true or false",
) )
config_subparsers.add_parser(
"update",
help=(
"Update default config files in ~/.config/pkgmgr/ from the "
"installed pkgmgr package (does not touch config.yaml)."
),
)
# ------------------------------------------------------------ # ------------------------------------------------------------
# path / explore / terminal / code / shell # path / explore / terminal / code / shell
# ------------------------------------------------------------ # ------------------------------------------------------------
@@ -304,10 +265,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"--command", "--command",
nargs=argparse.REMAINDER, nargs=argparse.REMAINDER,
dest="shell_command", dest="shell_command",
help=( help="The shell command (and its arguments) to execute in each repository",
"The shell command (and its arguments) to execute in each "
"repository"
),
default=[], default=[],
) )
@@ -331,10 +289,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
branch_open.add_argument( branch_open.add_argument(
"name", "name",
nargs="?", nargs="?",
help=( help="Name of the new branch (optional; will be asked interactively if omitted)",
"Name of the new branch (optional; will be asked interactively "
"if omitted)"
),
) )
branch_open.add_argument( branch_open.add_argument(
"--base", "--base",
@@ -360,10 +315,32 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
release_parser.add_argument( release_parser.add_argument(
"-m", "-m",
"--message", "--message",
default="", default=None,
help="Optional release message to add to the changelog and tag.", help=(
"Optional release message to add to the changelog and tag."
),
) )
# Generic selection / preview / list / extra_args
add_identifier_arguments(release_parser) add_identifier_arguments(release_parser)
# Close current branch after successful release
release_parser.add_argument(
"--close",
action="store_true",
help=(
"Close the current branch after a successful release in each "
"repository, if it is not main/master."
),
)
# Force: skip preview+confirmation and run release directly
release_parser.add_argument(
"-f",
"--force",
action="store_true",
help=(
"Skip the interactive preview+confirmation step and run the "
"release directly."
),
)
# ------------------------------------------------------------ # ------------------------------------------------------------
# version # version
@@ -372,8 +349,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"version", "version",
help=( help=(
"Show version information for repository/ies " "Show version information for repository/ies "
"(git tags, pyproject.toml, flake.nix, PKGBUILD, debian, spec, " "(git tags, pyproject.toml, flake.nix, PKGBUILD, debian, spec, Ansible Galaxy)."
"Ansible Galaxy)."
), ),
) )
add_identifier_arguments(version_parser) add_identifier_arguments(version_parser)
@@ -407,29 +383,20 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"list", "list",
help="List all repositories with details and status", help="List all repositories with details and status",
) )
# dieselbe Selektionslogik wie bei install/update/etc.: list_parser.add_argument(
add_identifier_arguments(list_parser) "--search",
default="",
help="Filter repositories that contain the given string",
)
list_parser.add_argument( list_parser.add_argument(
"--status", "--status",
type=str, type=str,
default="", default="",
help=( help="Filter repositories by status (case insensitive)",
"Filter repositories by status (case insensitive). "
"Use /regex/ for regular expressions."
),
) )
list_parser.add_argument(
"--description",
action="store_true",
help=(
"Show an additional detailed section per repository "
"(description, homepage, tags, categories, paths)."
),
)
# ------------------------------------------------------------ # ------------------------------------------------------------
# make # make (wrapper around make in repositories)
# ------------------------------------------------------------ # ------------------------------------------------------------
make_parser = subparsers.add_parser( make_parser = subparsers.add_parser(
"make", "make",
@@ -455,7 +422,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
add_identifier_arguments(make_deinstall) add_identifier_arguments(make_deinstall)
# ------------------------------------------------------------ # ------------------------------------------------------------
# Proxy commands (git, docker, docker compose, ...) # Proxy commands (git, docker, docker compose)
# ------------------------------------------------------------ # ------------------------------------------------------------
register_proxy_commands(subparsers) register_proxy_commands(subparsers)

View File

@@ -1,6 +1,3 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations from __future__ import annotations
import argparse import argparse
@@ -10,8 +7,8 @@ from typing import Dict, List
from pkgmgr.cli_core.context import CLIContext from pkgmgr.cli_core.context import CLIContext
from pkgmgr.clone_repos import clone_repos from pkgmgr.clone_repos import clone_repos
from pkgmgr.exec_proxy_command import exec_proxy_command from pkgmgr.exec_proxy_command import exec_proxy_command
from pkgmgr.pull_with_verification import pull_with_verification
from pkgmgr.get_selected_repos import get_selected_repos from pkgmgr.get_selected_repos import get_selected_repos
from pkgmgr.pull_with_verification import pull_with_verification
PROXY_COMMANDS: Dict[str, List[str]] = { PROXY_COMMANDS: Dict[str, List[str]] = {
@@ -45,7 +42,10 @@ PROXY_COMMANDS: Dict[str, List[str]] = {
def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None: def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None:
""" """
Selection arguments for proxy subcommands. Local copy of the identifier argument set for proxy commands.
This duplicates the semantics of cli.parser.add_identifier_arguments
to avoid circular imports.
""" """
parser.add_argument( parser.add_argument(
"identifiers", "identifiers",
@@ -66,24 +66,6 @@ def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None:
"yes | pkgmgr {subcommand} --all" "yes | pkgmgr {subcommand} --all"
), ),
) )
parser.add_argument(
"--category",
nargs="+",
default=[],
help=(
"Filter repositories by category patterns derived from config "
"filenames or repo metadata (use filename without .yml/.yaml, "
"or /regex/ to use a regular expression)."
),
)
parser.add_argument(
"--string",
default="",
help=(
"Filter repositories whose identifier / name / path contains this "
"substring (case-insensitive). Use /regex/ for regular expressions."
),
)
parser.add_argument( parser.add_argument(
"--preview", "--preview",
action="store_true", action="store_true",
@@ -108,7 +90,8 @@ def register_proxy_commands(
subparsers: argparse._SubParsersAction, subparsers: argparse._SubParsersAction,
) -> None: ) -> None:
""" """
Register proxy subcommands for git, docker, docker compose, ... Register proxy commands (git, docker, docker compose) as
top-level subcommands on the given subparsers.
""" """
for command, subcommands in PROXY_COMMANDS.items(): for command, subcommands in PROXY_COMMANDS.items():
for subcommand in subcommands: for subcommand in subcommands:
@@ -117,8 +100,7 @@ def register_proxy_commands(
help=f"Proxies '{command} {subcommand}' to repository/ies", help=f"Proxies '{command} {subcommand}' to repository/ies",
description=( description=(
f"Executes '{command} {subcommand}' for the " f"Executes '{command} {subcommand}' for the "
"selected repositories. " "identified repos.\nTo recieve more help execute "
"For more details see the underlying tool's help: "
f"'{command} {subcommand} --help'" f"'{command} {subcommand} --help'"
), ),
formatter_class=argparse.RawTextHelpFormatter, formatter_class=argparse.RawTextHelpFormatter,
@@ -147,8 +129,8 @@ def register_proxy_commands(
def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool: def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool:
""" """
If the top-level command is one of the proxy subcommands If the parsed command is a proxy command, execute it and return True.
(git / docker / docker compose), handle it here and return True. Otherwise return False to let the main dispatcher continue.
""" """
all_proxy_subcommands = { all_proxy_subcommands = {
sub for subs in PROXY_COMMANDS.values() for sub in subs sub for subs in PROXY_COMMANDS.values() for sub in subs
@@ -157,7 +139,12 @@ def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool:
if args.command not in all_proxy_subcommands: if args.command not in all_proxy_subcommands:
return False return False
selected = get_selected_repos(args, ctx.all_repositories) # Use generic selection semantics for proxies
selected = get_selected_repos(
getattr(args, "all", False),
ctx.all_repositories,
getattr(args, "identifiers", []),
)
for command, subcommands in PROXY_COMMANDS.items(): for command, subcommands in PROXY_COMMANDS.items():
if args.command not in subcommands: if args.command not in subcommands:
@@ -194,4 +181,4 @@ def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool:
sys.exit(0) sys.exit(0)
return True return True

View File

@@ -1,122 +1,46 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Initialize user configuration by scanning the repositories base directory.
This module scans the path:
defaults_config["directories"]["repositories"]
with the expected structure:
{base}/{provider}/{account}/{repository}
For each discovered repository, the function:
• derives provider, account, repository from the folder structure
• (optionally) determines the latest commit hash via git log
• generates a unique CLI alias
• marks ignore=True for newly discovered repos
• skips repos already known in defaults or user config
"""
from __future__ import annotations
import os
import subprocess import subprocess
from typing import Any, Dict import os
from pkgmgr.generate_alias import generate_alias from pkgmgr.generate_alias import generate_alias
from pkgmgr.save_user_config import save_user_config from pkgmgr.save_user_config import save_user_config
def config_init(user_config, defaults_config, bin_dir,USER_CONFIG_PATH:str):
def config_init(
user_config: Dict[str, Any],
defaults_config: Dict[str, Any],
bin_dir: str,
user_config_path: str,
) -> None:
""" """
Scan the repositories base directory and add missing entries Scan the base directory (defaults_config["base"]) for repositories.
to the user configuration. The folder structure is assumed to be:
{base}/{provider}/{account}/{repository}
For each repository found, automatically determine:
- provider, account, repository from folder names.
- verified: the latest commit (via 'git log -1 --format=%H').
- alias: generated from the repository name using generate_alias().
Repositories already defined in defaults_config["repositories"] or user_config["repositories"] are skipped.
""" """
repositories_base_dir = os.path.expanduser(defaults_config["directories"]["repositories"])
# ------------------------------------------------------------
# Announce where we will write the result
# ------------------------------------------------------------
print("============================================================")
print(f"[INIT] Writing user configuration to:")
print(f" {user_config_path}")
print("============================================================")
repositories_base_dir = os.path.expanduser(
defaults_config["directories"]["repositories"]
)
print(f"[INIT] Scanning repository base directory:")
print(f" {repositories_base_dir}")
print("")
if not os.path.isdir(repositories_base_dir): if not os.path.isdir(repositories_base_dir):
print(f"[ERROR] Base directory does not exist: {repositories_base_dir}") print(f"Base directory '{repositories_base_dir}' does not exist.")
return return
default_keys = { default_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
(entry.get("provider"), entry.get("account"), entry.get("repository")) for entry in defaults_config.get("repositories", [])}
for entry in defaults_config.get("repositories", []) existing_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
} for entry in user_config.get("repositories", [])}
existing_keys = { existing_aliases = {entry.get("alias") for entry in user_config.get("repositories", []) if entry.get("alias")}
(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in user_config.get("repositories", [])
}
existing_aliases = {
entry.get("alias")
for entry in user_config.get("repositories", [])
if entry.get("alias")
}
new_entries = [] new_entries = []
scanned = 0
skipped = 0
# ------------------------------------------------------------
# Actual scanning
# ------------------------------------------------------------
for provider in os.listdir(repositories_base_dir): for provider in os.listdir(repositories_base_dir):
provider_path = os.path.join(repositories_base_dir, provider) provider_path = os.path.join(repositories_base_dir, provider)
if not os.path.isdir(provider_path): if not os.path.isdir(provider_path):
continue continue
print(f"[SCAN] Provider: {provider}")
for account in os.listdir(provider_path): for account in os.listdir(provider_path):
account_path = os.path.join(provider_path, account) account_path = os.path.join(provider_path, account)
if not os.path.isdir(account_path): if not os.path.isdir(account_path):
continue continue
print(f"[SCAN] Account: {account}")
for repo_name in os.listdir(account_path): for repo_name in os.listdir(account_path):
repo_path = os.path.join(account_path, repo_name) repo_path = os.path.join(account_path, repo_name)
if not os.path.isdir(repo_path): if not os.path.isdir(repo_path):
continue continue
scanned += 1
key = (provider, account, repo_name) key = (provider, account, repo_name)
if key in default_keys or key in existing_keys:
# Already known?
if key in default_keys:
skipped += 1
print(f"[SKIP] (defaults) {provider}/{account}/{repo_name}")
continue continue
if key in existing_keys:
skipped += 1
print(f"[SKIP] (user-config) {provider}/{account}/{repo_name}")
continue
print(f"[ADD] {provider}/{account}/{repo_name}")
# Determine commit hash
try: try:
result = subprocess.run( result = subprocess.run(
["git", "log", "-1", "--format=%H"], ["git", "log", "-1", "--format=%H"],
@@ -127,55 +51,25 @@ def config_init(
check=True, check=True,
) )
verified = result.stdout.strip() verified = result.stdout.strip()
print(f"[INFO] Latest commit: {verified}") except Exception as e:
except Exception as exc:
verified = "" verified = ""
print(f"[WARN] Could not read commit: {exc}") print(f"Could not determine latest commit for {repo_name} ({provider}/{account}): {e}")
entry = { entry = {
"provider": provider, "provider": provider,
"account": account, "account": account,
"repository": repo_name, "repository": repo_name,
"verified": {"commit": verified}, "verified": {"commit": verified},
"ignore": True, "ignore": True
} }
alias = generate_alias({"repository": repo_name, "provider": provider, "account": account}, bin_dir, existing_aliases)
# Alias generation
alias = generate_alias(
{
"repository": repo_name,
"provider": provider,
"account": account,
},
bin_dir,
existing_aliases,
)
entry["alias"] = alias entry["alias"] = alias
existing_aliases.add(alias) existing_aliases.add(alias)
print(f"[INFO] Alias generated: {alias}")
new_entries.append(entry) new_entries.append(entry)
print(f"Adding new repo entry: {entry}")
print("") # blank line between accounts
# ------------------------------------------------------------
# Summary
# ------------------------------------------------------------
print("============================================================")
print(f"[DONE] Scanned repositories: {scanned}")
print(f"[DONE] Skipped (known): {skipped}")
print(f"[DONE] New entries discovered: {len(new_entries)}")
print("============================================================")
# ------------------------------------------------------------
# Save if needed
# ------------------------------------------------------------
if new_entries: if new_entries:
user_config.setdefault("repositories", []).extend(new_entries) user_config.setdefault("repositories", []).extend(new_entries)
save_user_config(user_config, user_config_path) save_user_config(user_config,USER_CONFIG_PATH)
print(f"[SAVE] Wrote user configuration to:")
print(f" {user_config_path}")
else: else:
print("[INFO] No new repositories were added.") print("No new repositories found.")
print("============================================================")

View File

@@ -1,170 +1,29 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os import os
import re import sys
from typing import Any, Dict, List, Sequence from .resolve_repos import resolve_repos
from .filter_ignored import filter_ignored
from pkgmgr.resolve_repos import resolve_repos from .get_repo_dir import get_repo_dir
Repository = Dict[str, Any]
def _compile_maybe_regex(pattern: str):
"""
If pattern is of the form /.../, return a compiled regex (case-insensitive).
Otherwise return None.
"""
if len(pattern) >= 2 and pattern.startswith("/") and pattern.endswith("/"):
try:
return re.compile(pattern[1:-1], re.IGNORECASE)
except re.error:
return None
return None
def _match_pattern(value: str, pattern: str) -> bool:
"""
Match a value against a pattern that may be a substring or /regex/.
"""
if not pattern:
return True
regex = _compile_maybe_regex(pattern)
if regex:
return bool(regex.search(value))
return pattern.lower() in value.lower()
def _match_any(values: Sequence[str], pattern: str) -> bool:
"""
Return True if any of the values matches the pattern.
"""
for v in values:
if _match_pattern(v, pattern):
return True
return False
def _build_identifier_string(repo: Repository) -> str:
"""
Build a combined identifier string for string-based filtering.
"""
provider = str(repo.get("provider", ""))
account = str(repo.get("account", ""))
repository = str(repo.get("repository", ""))
alias = str(repo.get("alias", ""))
description = str(repo.get("description", ""))
directory = str(repo.get("directory", ""))
parts = [
provider,
account,
repository,
alias,
f"{provider}/{account}/{repository}",
description,
directory,
]
return " ".join(p for p in parts if p)
def _apply_filters(
repos: List[Repository],
string_pattern: str,
category_patterns: List[str],
tag_patterns: List[str],
) -> List[Repository]:
if not string_pattern and not category_patterns and not tag_patterns:
return repos
filtered: List[Repository] = []
for repo in repos:
# String filter
if string_pattern:
ident_str = _build_identifier_string(repo)
if not _match_pattern(ident_str, string_pattern):
continue
# Category filter: nur echte Kategorien, KEINE Tags
if category_patterns:
cats: List[str] = []
cats.extend(map(str, repo.get("category_files", [])))
if "category" in repo:
cats.append(str(repo["category"]))
if not cats:
continue
ok = True
for pat in category_patterns:
if not _match_any(cats, pat):
ok = False
break
if not ok:
continue
# Tag filter: ausschließlich YAML-Tags
if tag_patterns:
tags: List[str] = list(map(str, repo.get("tags", [])))
if not tags:
continue
ok = True
for pat in tag_patterns:
if not _match_any(tags, pat):
ok = False
break
if not ok:
continue
filtered.append(repo)
def get_selected_repos(show_all: bool, all_repos_list, identifiers=None):
if show_all:
selected = all_repos_list
else:
selected = resolve_repos(identifiers, all_repos_list)
# If no repositories were found using the provided identifiers,
# try to automatically select based on the current directory:
if not selected:
current_dir = os.getcwd()
directory_name = os.path.basename(current_dir)
# Pack the directory name in a list since resolve_repos expects a list.
auto_selected = resolve_repos([directory_name], all_repos_list)
if auto_selected:
# Check if the path of the first auto-selected repository matches the current directory.
if os.path.abspath(auto_selected[0].get("directory")) == os.path.abspath(current_dir):
print(f"Repository {auto_selected[0]['repository']} has been auto-selected by path.")
selected = auto_selected
filtered = filter_ignored(selected)
if not filtered:
print("Error: No repositories had been selected.")
sys.exit(4)
return filtered return filtered
def get_selected_repos(args, all_repositories: List[Repository]) -> List[Repository]:
"""
Compute the list of repositories selected by CLI arguments.
Modes:
- If identifiers are given: select via resolve_repos() from all_repositories.
- Else if any of --category/--string/--tag is used: start from all_repositories
and apply filters.
- Else if --all is set: select all_repositories.
- Else: try to select the repository of the current working directory.
"""
identifiers: List[str] = getattr(args, "identifiers", []) or []
use_all: bool = bool(getattr(args, "all", False))
category_patterns: List[str] = getattr(args, "category", []) or []
string_pattern: str = getattr(args, "string", "") or ""
tag_patterns: List[str] = getattr(args, "tag", []) or []
has_filters = bool(category_patterns or string_pattern or tag_patterns)
# 1) Explicit identifiers win
if identifiers:
base = resolve_repos(identifiers, all_repositories)
return _apply_filters(base, string_pattern, category_patterns, tag_patterns)
# 2) Filter-only mode: start from all repositories
if has_filters:
return _apply_filters(list(all_repositories), string_pattern, category_patterns, tag_patterns)
# 3) --all (no filters): all repos
if use_all:
return list(all_repositories)
# 4) Fallback: try to select repository of current working directory
cwd = os.path.abspath(os.getcwd())
by_dir = [
repo
for repo in all_repositories
if os.path.abspath(str(repo.get("directory", ""))) == cwd
]
if by_dir:
return by_dir
# No specific match -> empty list
return []

View File

@@ -1,352 +1,108 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Pretty-print repository list with status, categories, tags and path.
- Tags come exclusively from YAML: repo["tags"].
- Categories come from repo["category_files"] (YAML file names without
.yml/.yaml) and optional repo["category"].
- Optional detail mode (--description) prints an extended section per
repository with description, homepage, etc.
"""
from __future__ import annotations
import os import os
import re from pkgmgr.get_repo_identifier import get_repo_identifier
from textwrap import wrap from pkgmgr.get_repo_dir import get_repo_dir
from typing import Any, Dict, List, Optional
Repository = Dict[str, Any] def list_repositories(all_repos, repositories_base_dir, bin_dir, search_filter="", status_filter=""):
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
MAGENTA = "\033[35m"
GREY = "\033[90m"
def _compile_maybe_regex(pattern: str) -> Optional[re.Pattern[str]]:
""" """
If pattern is of the form /.../, return a compiled regex (case-insensitive). Lists all repositories with their attributes and status information.
Otherwise return None. The repositories are sorted in ascending order by their identifier.
Parameters:
all_repos (list): List of repository configurations.
repositories_base_dir (str): The base directory where repositories are located.
bin_dir (str): The directory where executable wrappers are stored.
search_filter (str): Filter for repository attributes (case insensitive).
status_filter (str): Filter for computed status info (case insensitive).
For each repository, the identifier is printed in bold, the description (if available)
in italic, then all other attributes and computed status are printed.
If the repository is installed, a hint is displayed under the attributes.
Repositories are filtered out if either the search_filter is not found in any attribute or
if the status_filter is not found in the computed status string.
""" """
if not pattern: search_filter = search_filter.lower() if search_filter else ""
return None status_filter = status_filter.lower() if status_filter else ""
if len(pattern) >= 2 and pattern.startswith("/") and pattern.endswith("/"):
try:
return re.compile(pattern[1:-1], re.IGNORECASE)
except re.error:
return None
return None
# Define status colors using colors not used for other attributes:
# Avoid red (for ignore), blue (for homepage) and yellow (for verified).
status_colors = {
"Installed": "\033[1;32m", # Green
"Not Installed": "\033[1;35m", # Magenta
"Cloned": "\033[1;36m", # Cyan
"Clonable": "\033[1;37m", # White
"Ignored": "\033[38;5;208m", # Orange (extended)
"Active": "\033[38;5;129m", # Light Purple (extended)
"Installable": "\033[38;5;82m" # Light Green (extended)
}
def _status_matches(status: str, status_filter: str) -> bool: # Sort all repositories by their identifier in ascending order.
""" sorted_repos = sorted(all_repos, key=lambda repo: get_repo_identifier(repo, all_repos))
Match a status string against an optional filter (substring or /regex/).
"""
if not status_filter:
return True
regex = _compile_maybe_regex(status_filter) for repo in sorted_repos:
if regex: # Combine all attribute values into one string for filtering.
return bool(regex.search(status)) repo_text = " ".join(str(v) for v in repo.values()).lower()
return status_filter.lower() in status.lower() if search_filter and search_filter not in repo_text:
def _compute_repo_dir(repositories_base_dir: str, repo: Repository) -> str:
"""
Compute the local directory for a repository.
If the repository already has a 'directory' key, that is used;
otherwise the path is constructed from provider/account/repository
under repositories_base_dir.
"""
if repo.get("directory"):
return os.path.expanduser(str(repo["directory"]))
provider = str(repo.get("provider", ""))
account = str(repo.get("account", ""))
repository = str(repo.get("repository", ""))
return os.path.join(
os.path.expanduser(repositories_base_dir),
provider,
account,
repository,
)
def _compute_status(
repo: Repository,
repo_dir: str,
binaries_dir: str,
) -> str:
"""
Compute a human-readable status string, e.g. 'present,alias,ignored'.
"""
parts: List[str] = []
exists = os.path.isdir(repo_dir)
if exists:
parts.append("present")
else:
parts.append("absent")
alias = repo.get("alias")
if alias:
alias_path = os.path.join(os.path.expanduser(binaries_dir), str(alias))
if os.path.exists(alias_path):
parts.append("alias")
else:
parts.append("alias-missing")
if repo.get("ignore"):
parts.append("ignored")
return ",".join(parts) if parts else "-"
def _color_status(status_padded: str) -> str:
"""
Color individual status flags inside a padded status string.
Input is expected to be right-padded to the column width.
Color mapping:
- present -> green
- absent -> red
- alias -> red
- alias-missing -> red
- ignored -> magenta
- other -> default
"""
core = status_padded.rstrip()
pad_spaces = len(status_padded) - len(core)
plain_parts = core.split(",") if core else []
colored_parts: List[str] = []
for raw_part in plain_parts:
name = raw_part.strip()
if not name:
continue continue
if name == "present": # Compute status information for the repository.
color = GREEN identifier = get_repo_identifier(repo, all_repos)
elif name == "absent": executable_path = os.path.join(bin_dir, identifier)
color = MAGENTA repo_dir = get_repo_dir(repositories_base_dir, repo)
elif name in ("alias", "alias-missing"): status_list = []
color = YELLOW
elif name == "ignored": # Check if the executable exists (Installed).
color = MAGENTA if os.path.exists(executable_path):
status_list.append("Installed")
else: else:
color = "" status_list.append("Not Installed")
# Check if the repository directory exists (Cloned).
if color: if os.path.exists(repo_dir):
colored_parts.append(f"{color}{name}{RESET}") status_list.append("Cloned")
else: else:
colored_parts.append(name) status_list.append("Clonable")
# Mark ignored repositories.
if repo.get("ignore", False):
status_list.append("Ignored")
else:
status_list.append("Active")
# Define installable as cloned but not installed.
if os.path.exists(repo_dir) and not os.path.exists(executable_path):
status_list.append("Installable")
colored_core = ",".join(colored_parts) # Build a colored status string.
return colored_core + (" " * pad_spaces) colored_statuses = [f"{status_colors.get(s, '')}{s}\033[0m" for s in status_list]
status_str = ", ".join(colored_statuses)
# If a status_filter is provided, only display repos whose status contains the filter.
def list_repositories( if status_filter and status_filter not in status_str.lower():
repositories: List[Repository],
repositories_base_dir: str,
binaries_dir: str,
search_filter: str = "",
status_filter: str = "",
extra_tags: Optional[List[str]] = None,
show_description: bool = False,
) -> None:
"""
Print a table of repositories and (optionally) detailed descriptions.
Parameters
----------
repositories:
Repositories to show (usually already filtered by get_selected_repos).
repositories_base_dir:
Base directory where repositories live.
binaries_dir:
Directory where alias symlinks live.
search_filter:
Optional substring/regex filter on identifier and metadata.
status_filter:
Optional filter on computed status.
extra_tags:
Additional tags to show for each repository (CLI overlay only).
show_description:
If True, print a detailed block for each repository after the table.
"""
if extra_tags is None:
extra_tags = []
search_regex = _compile_maybe_regex(search_filter)
rows: List[Dict[str, Any]] = []
# ------------------------------------------------------------------
# Build rows
# ------------------------------------------------------------------
for repo in repositories:
identifier = str(repo.get("repository") or repo.get("alias") or "")
alias = str(repo.get("alias") or "")
provider = str(repo.get("provider") or "")
account = str(repo.get("account") or "")
description = str(repo.get("description") or "")
homepage = str(repo.get("homepage") or "")
repo_dir = _compute_repo_dir(repositories_base_dir, repo)
status = _compute_status(repo, repo_dir, binaries_dir)
if not _status_matches(status, status_filter):
continue continue
if search_filter: # Display repository details:
haystack = " ".join( # Print the identifier in bold.
[ print(f"\033[1m{identifier}\033[0m")
identifier, # Print the description in italic if it exists.
alias, description = repo.get("description")
provider,
account,
description,
homepage,
repo_dir,
]
)
if search_regex:
if not search_regex.search(haystack):
continue
else:
if search_filter.lower() not in haystack.lower():
continue
categories: List[str] = []
categories.extend(map(str, repo.get("category_files", [])))
if repo.get("category"):
categories.append(str(repo["category"]))
yaml_tags: List[str] = list(map(str, repo.get("tags", [])))
display_tags: List[str] = sorted(
set(yaml_tags + list(map(str, extra_tags)))
)
rows.append(
{
"repo": repo,
"identifier": identifier,
"status": status,
"categories": categories,
"tags": display_tags,
"dir": repo_dir,
}
)
if not rows:
print("No repositories matched the given filters.")
return
# ------------------------------------------------------------------
# Table section (header grey, values white, per-flag colored status)
# ------------------------------------------------------------------
ident_width = max(len("IDENTIFIER"), max(len(r["identifier"]) for r in rows))
status_width = max(len("STATUS"), max(len(r["status"]) for r in rows))
cat_width = max(
len("CATEGORIES"),
max((len(",".join(r["categories"])) for r in rows), default=0),
)
tag_width = max(
len("TAGS"),
max((len(",".join(r["tags"])) for r in rows), default=0),
)
header = (
f"{GREY}{BOLD}"
f"{'IDENTIFIER'.ljust(ident_width)} "
f"{'STATUS'.ljust(status_width)} "
f"{'CATEGORIES'.ljust(cat_width)} "
f"{'TAGS'.ljust(tag_width)} "
f"DIR"
f"{RESET}"
)
print(header)
print("-" * (ident_width + status_width + cat_width + tag_width + 10 + 40))
for r in rows:
ident_col = r["identifier"].ljust(ident_width)
cat_col = ",".join(r["categories"]).ljust(cat_width)
tag_col = ",".join(r["tags"]).ljust(tag_width)
dir_col = r["dir"]
status = r["status"]
status_padded = status.ljust(status_width)
status_colored = _color_status(status_padded)
print(
f"{ident_col} "
f"{status_colored} "
f"{cat_col} "
f"{tag_col} "
f"{dir_col}"
)
# ------------------------------------------------------------------
# Detailed section (alias value red, same status coloring)
# ------------------------------------------------------------------
if not show_description:
return
print()
print(f"{BOLD}Detailed repository information:{RESET}")
print()
for r in rows:
repo = r["repo"]
identifier = r["identifier"]
alias = str(repo.get("alias") or "")
provider = str(repo.get("provider") or "")
account = str(repo.get("account") or "")
repository = str(repo.get("repository") or "")
description = str(repo.get("description") or "")
homepage = str(repo.get("homepage") or "")
categories = r["categories"]
tags = r["tags"]
repo_dir = r["dir"]
status = r["status"]
print(f"{BOLD}{identifier}{RESET}")
print(f" Provider: {provider}")
print(f" Account: {account}")
print(f" Repository: {repository}")
# Alias value highlighted in red
if alias:
print(f" Alias: {RED}{alias}{RESET}")
status_colored = _color_status(status)
print(f" Status: {status_colored}")
if categories:
print(f" Categories: {', '.join(categories)}")
if tags:
print(f" Tags: {', '.join(tags)}")
print(f" Directory: {repo_dir}")
if homepage:
print(f" Homepage: {homepage}")
if description: if description:
print(" Description:") print(f"\n\033[3m{description}\033[0m")
for line in wrap(description, width=78): print("\nAttributes:")
print(f" {line}") # Loop through all attributes.
for key, value in repo.items():
print() formatted_value = str(value)
# Special formatting for the "verified" attribute (yellow).
if key == "verified" and value:
formatted_value = f"\033[1;33m{value}\033[0m"
# Special formatting for the "ignore" flag (red if True).
if key == "ignore" and value:
formatted_value = f"\033[1;31m{value}\033[0m"
if key == "description":
continue
# Highlight homepage in blue.
if key.lower() == "homepage" and value:
formatted_value = f"\033[1;34m{value}\033[0m"
print(f" {key}: {formatted_value}")
# Always display the computed status.
print(f" Status: {status_str}")
# If the repository is installed, display a hint for more info.
if os.path.exists(executable_path):
print(f"\nMore information and help: \033[1;4mpkgmgr {identifier} --help\033[0m\n")
print("-" * 40)

View File

@@ -1,305 +1,30 @@
#!/usr/bin/env python3 import sys
# -*- coding: utf-8 -*-
"""
Load and merge pkgmgr configuration.
Layering rules:
1. Defaults / category files:
- Zuerst werden alle *.yml/*.yaml (außer config.yaml) im
Benutzerverzeichnis geladen:
~/.config/pkgmgr/
- Falls dort keine passenden Dateien existieren, wird auf die im
Paket / Projekt mitgelieferten Config-Verzeichnisse zurückgegriffen:
<pkg_root>/config_defaults
<pkg_root>/config
<project_root>/config_defaults
<project_root>/config
Dabei werden ebenfalls alle *.yml/*.yaml als Layer geladen.
- Der Dateiname ohne Endung (stem) wird als Kategorie-Name
verwendet und in repo["category_files"] eingetragen.
2. User config:
- ~/.config/pkgmgr/config.yaml (oder der übergebene Pfad)
wird geladen und PER LISTEN-MERGE über die Defaults gelegt:
- directories: dict deep-merge
- repositories: per _merge_repo_lists (kein Löschen!)
3. Ergebnis:
- Ein dict mit mindestens:
config["directories"] (dict)
config["repositories"] (list[dict])
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Dict, List, Tuple
import yaml import yaml
import os
from .get_repo_dir import get_repo_dir
DEFAULT_CONFIG_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../","config", "defaults.yaml")
Repo = Dict[str, Any] def load_config(user_config_path):
"""Load configuration from defaults and merge in user config if present."""
if not os.path.exists(DEFAULT_CONFIG_PATH):
# --------------------------------------------------------------------------- print(f"Default configuration file '{DEFAULT_CONFIG_PATH}' not found.")
# Hilfsfunktionen sys.exit(5)
# --------------------------------------------------------------------------- with open(DEFAULT_CONFIG_PATH, 'r') as f:
config = yaml.safe_load(f)
def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]: if "directories" not in config or "repositories" not in config:
""" print("Default config file must contain 'directories' and 'repositories' keys.")
Recursively merge two dictionaries. sys.exit(6)
if os.path.exists(user_config_path):
Values from `override` win over values in `base`. with open(user_config_path, 'r') as f:
""" user_config = yaml.safe_load(f)
for key, value in override.items(): if user_config:
if ( if "directories" in user_config:
key in base config["directories"] = user_config["directories"]
and isinstance(base[key], dict) if "repositories" in user_config:
and isinstance(value, dict) config["repositories"].extend(user_config["repositories"])
): for repository in config["repositories"]:
_deep_merge(base[key], value) # You can overwritte the directory path in the config
else: if "directory" not in repository:
base[key] = value directory = get_repo_dir(config["directories"]["repositories"], repository)
return base repository["directory"] = os.path.expanduser(directory)
return config
def _repo_key(repo: Repo) -> Tuple[str, str, str]:
"""
Normalised key for identifying a repository across config files.
"""
return (
str(repo.get("provider", "")),
str(repo.get("account", "")),
str(repo.get("repository", "")),
)
def _merge_repo_lists(
base_list: List[Repo],
new_list: List[Repo],
category_name: str | None = None,
) -> List[Repo]:
"""
Merge two repository lists, matching by (provider, account, repository).
- Wenn ein Repo aus new_list noch nicht existiert, wird es hinzugefügt.
- Wenn es existiert, werden seine Felder per Deep-Merge überschrieben.
- Wenn category_name gesetzt ist, wird dieser in
repo["category_files"] eingetragen.
"""
index: Dict[Tuple[str, str, str], Repo] = {
_repo_key(r): r for r in base_list
}
for src in new_list:
key = _repo_key(src)
if key == ("", "", ""):
# Unvollständiger Schlüssel -> einfach anhängen
dst = dict(src)
if category_name:
dst.setdefault("category_files", [])
if category_name not in dst["category_files"]:
dst["category_files"].append(category_name)
base_list.append(dst)
continue
existing = index.get(key)
if existing is None:
dst = dict(src)
if category_name:
dst.setdefault("category_files", [])
if category_name not in dst["category_files"]:
dst["category_files"].append(category_name)
base_list.append(dst)
index[key] = dst
else:
_deep_merge(existing, src)
if category_name:
existing.setdefault("category_files", [])
if category_name not in existing["category_files"]:
existing["category_files"].append(category_name)
return base_list
def _load_yaml_file(path: Path) -> Dict[str, Any]:
"""
Load a single YAML file as dict. Non-dicts yield {}.
"""
if not path.is_file():
return {}
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
return {}
return data
def _load_layer_dir(
config_dir: Path,
skip_filename: str | None = None,
) -> Dict[str, Any]:
"""
Load all *.yml/*.yaml from a directory as layered defaults.
- skip_filename: Dateiname (z.B. "config.yaml"), der ignoriert
werden soll (z.B. User-Config).
Rückgabe:
{
"directories": {...},
"repositories": [...],
}
"""
defaults: Dict[str, Any] = {"directories": {}, "repositories": []}
if not config_dir.is_dir():
return defaults
yaml_files = [
p
for p in config_dir.iterdir()
if p.is_file()
and p.suffix.lower() in (".yml", ".yaml")
and (skip_filename is None or p.name != skip_filename)
]
if not yaml_files:
return defaults
yaml_files.sort(key=lambda p: p.name)
for path in yaml_files:
data = _load_yaml_file(path)
category_name = path.stem # Dateiname ohne .yml/.yaml
dirs = data.get("directories")
if isinstance(dirs, dict):
defaults.setdefault("directories", {})
_deep_merge(defaults["directories"], dirs)
repos = data.get("repositories")
if isinstance(repos, list):
defaults.setdefault("repositories", [])
_merge_repo_lists(
defaults["repositories"],
repos,
category_name=category_name,
)
return defaults
def _load_defaults_from_package_or_project() -> Dict[str, Any]:
"""
Fallback: Versuche Defaults aus dem installierten Paket ODER
aus dem Projekt-Root zu laden:
<pkg_root>/config_defaults
<pkg_root>/config
<project_root>/config_defaults
<project_root>/config
"""
try:
import pkgmgr # type: ignore
except Exception:
return {"directories": {}, "repositories": []}
pkg_root = Path(pkgmgr.__file__).resolve().parent
project_root = pkg_root.parent
candidates = [
pkg_root / "config_defaults",
pkg_root / "config",
project_root / "config_defaults",
project_root / "config",
]
for cand in candidates:
defaults = _load_layer_dir(cand, skip_filename=None)
if defaults["directories"] or defaults["repositories"]:
return defaults
return {"directories": {}, "repositories": []}
# ---------------------------------------------------------------------------
# Hauptfunktion
# ---------------------------------------------------------------------------
def load_config(user_config_path: str) -> Dict[str, Any]:
"""
Load and merge configuration for pkgmgr.
Schritte:
1. Ermittle ~/.config/pkgmgr/ (oder das Verzeichnis von user_config_path).
2. Lade alle *.yml/*.yaml dort (außer der User-Config selbst) als
Defaults / Kategorie-Layer.
3. Wenn dort nichts gefunden wurde, Fallback auf Paket/Projekt.
4. Lade die User-Config-Datei selbst (falls vorhanden).
5. Merge:
- directories: deep-merge (Defaults <- User)
- repositories: _merge_repo_lists (Defaults <- User)
"""
user_config_path_expanded = os.path.expanduser(user_config_path)
user_cfg_path = Path(user_config_path_expanded)
config_dir = user_cfg_path.parent
if not str(config_dir):
# Fallback, falls jemand nur "config.yaml" übergibt
config_dir = Path(os.path.expanduser("~/.config/pkgmgr"))
config_dir.mkdir(parents=True, exist_ok=True)
user_cfg_name = user_cfg_path.name
# 1+2) Defaults / Kategorie-Layer aus dem User-Verzeichnis
defaults = _load_layer_dir(config_dir, skip_filename=user_cfg_name)
# 3) Falls dort nichts gefunden wurde, Fallback auf Paket/Projekt
if not defaults["directories"] and not defaults["repositories"]:
defaults = _load_defaults_from_package_or_project()
defaults.setdefault("directories", {})
defaults.setdefault("repositories", [])
# 4) User-Config
user_cfg: Dict[str, Any] = {}
if user_cfg_path.is_file():
user_cfg = _load_yaml_file(user_cfg_path)
user_cfg.setdefault("directories", {})
user_cfg.setdefault("repositories", [])
# 5) Merge: directories deep-merge, repositories listen-merge
merged: Dict[str, Any] = {}
# directories
merged["directories"] = {}
_deep_merge(merged["directories"], defaults["directories"])
_deep_merge(merged["directories"], user_cfg["directories"])
# repositories
merged["repositories"] = []
_merge_repo_lists(merged["repositories"], defaults["repositories"], category_name=None)
_merge_repo_lists(merged["repositories"], user_cfg["repositories"], category_name=None)
# andere Top-Level-Keys (falls vorhanden)
other_keys = (set(defaults.keys()) | set(user_cfg.keys())) - {
"directories",
"repositories",
}
for key in other_keys:
base_val = defaults.get(key)
override_val = user_cfg.get(key)
if isinstance(base_val, dict) and isinstance(override_val, dict):
merged[key] = _deep_merge(dict(base_val), override_val)
elif override_val is not None:
merged[key] = override_val
else:
merged[key] = base_val
return merged

View File

@@ -1,3 +1,4 @@
# pkgmgr/release.py
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
@@ -22,12 +23,14 @@ Additional behaviour:
phases: phases:
1) Preview-only run (dry-run). 1) Preview-only run (dry-run).
2) Interactive confirmation, then real release if confirmed. 2) Interactive confirmation, then real release if confirmed.
This confirmation can be skipped with the `-f/--force` flag. This confirmation can be skipped with the `force=True` flag.
- If `close=True` is used and the current branch is not main/master,
the branch will be closed via branch_commands.close_branch() after
a successful release.
""" """
from __future__ import annotations from __future__ import annotations
import argparse
import os import os
import re import re
import subprocess import subprocess
@@ -37,6 +40,7 @@ from datetime import date, datetime
from typing import Optional, Tuple from typing import Optional, Tuple
from pkgmgr.git_utils import get_tags, get_current_branch, GitError from pkgmgr.git_utils import get_tags, get_current_branch, GitError
from pkgmgr.branch_commands import close_branch
from pkgmgr.versioning import ( from pkgmgr.versioning import (
SemVer, SemVer,
find_latest_version, find_latest_version,
@@ -137,7 +141,6 @@ def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
encoding="utf-8", encoding="utf-8",
) as tmp: ) as tmp:
tmp_path = tmp.name tmp_path = tmp.name
# Prefill with instructions as comments
tmp.write( tmp.write(
"# Write the changelog entry for this release.\n" "# Write the changelog entry for this release.\n"
"# Lines starting with '#' will be ignored.\n" "# Lines starting with '#' will be ignored.\n"
@@ -147,10 +150,14 @@ def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
tmp.write(initial_message.strip() + "\n") tmp.write(initial_message.strip() + "\n")
tmp.flush() tmp.flush()
# Open editor try:
subprocess.call([editor, tmp_path]) subprocess.call([editor, tmp_path])
except FileNotFoundError:
print(
f"[WARN] Editor {editor!r} not found; proceeding without "
"interactive changelog message."
)
# Read back content
try: try:
with open(tmp_path, "r", encoding="utf-8") as f: with open(tmp_path, "r", encoding="utf-8") as f:
content = f.read() content = f.read()
@@ -160,7 +167,6 @@ def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
except OSError: except OSError:
pass pass
# Filter out commented lines and return joined text
lines = [ lines = [
line for line in content.splitlines() line for line in content.splitlines()
if not line.strip().startswith("#") if not line.strip().startswith("#")
@@ -186,14 +192,6 @@ def update_pyproject_version(
version = "X.Y.Z" version = "X.Y.Z"
and replaces the version part with the given new_version string. and replaces the version part with the given new_version string.
It does not try to parse the full TOML structure here. This keeps the
implementation small and robust as long as the version line follows
the standard pattern.
Behaviour:
- In normal mode: write the updated content back to the file.
- In preview mode: do NOT write, only report what would change.
""" """
try: try:
with open(pyproject_path, "r", encoding="utf-8") as f: with open(pyproject_path, "r", encoding="utf-8") as f:
@@ -231,13 +229,6 @@ def update_flake_version(
) -> None: ) -> None:
""" """
Update the version in flake.nix, if present. Update the version in flake.nix, if present.
Looks for a line like:
version = "1.2.3";
and replaces the string inside the quotes. If the file does not
exist or no version line is found, this is treated as a non-fatal
condition and only a log message is printed.
""" """
if not os.path.exists(flake_path): if not os.path.exists(flake_path):
print("[INFO] flake.nix not found, skipping.") print("[INFO] flake.nix not found, skipping.")
@@ -282,13 +273,6 @@ def update_pkgbuild_version(
Expects: Expects:
pkgver=1.2.3 pkgver=1.2.3
pkgrel=1 pkgrel=1
Behaviour:
- Set pkgver to the new_version (e.g. 1.2.3).
- Reset pkgrel to 1.
If the file does not exist, this is non-fatal and only a log
message is printed.
""" """
if not os.path.exists(pkgbuild_path): if not os.path.exists(pkgbuild_path):
print("[INFO] PKGBUILD not found, skipping.") print("[INFO] PKGBUILD not found, skipping.")
@@ -301,7 +285,6 @@ def update_pkgbuild_version(
print(f"[WARN] Could not read PKGBUILD: {exc}") print(f"[WARN] Could not read PKGBUILD: {exc}")
return return
# Update pkgver
ver_pattern = r"^(pkgver\s*=\s*)(.+)$" ver_pattern = r"^(pkgver\s*=\s*)(.+)$"
new_content, ver_count = re.subn( new_content, ver_count = re.subn(
ver_pattern, ver_pattern,
@@ -312,9 +295,8 @@ def update_pkgbuild_version(
if ver_count == 0: if ver_count == 0:
print("[WARN] No pkgver line found in PKGBUILD.") print("[WARN] No pkgver line found in PKGBUILD.")
new_content = content # revert to original if we didn't change anything new_content = content
# Reset pkgrel to 1
rel_pattern = r"^(pkgrel\s*=\s*)(.+)$" rel_pattern = r"^(pkgrel\s*=\s*)(.+)$"
new_content, rel_count = re.subn( new_content, rel_count = re.subn(
rel_pattern, rel_pattern,
@@ -343,19 +325,6 @@ def update_spec_version(
) -> None: ) -> None:
""" """
Update the version in an RPM spec file, if present. Update the version in an RPM spec file, if present.
Assumes a file like 'package-manager.spec' with lines:
Version: 1.2.3
Release: 1%{?dist}
Behaviour:
- Set 'Version:' to new_version.
- Reset 'Release:' to '1' while preserving any macro suffix,
e.g. '1%{?dist}'.
If the file does not exist, this is non-fatal and only a log
message is printed.
""" """
if not os.path.exists(spec_path): if not os.path.exists(spec_path):
print("[INFO] RPM spec file not found, skipping.") print("[INFO] RPM spec file not found, skipping.")
@@ -368,7 +337,6 @@ def update_spec_version(
print(f"[WARN] Could not read spec file: {exc}") print(f"[WARN] Could not read spec file: {exc}")
return return
# Update Version:
ver_pattern = r"^(Version:\s*)(.+)$" ver_pattern = r"^(Version:\s*)(.+)$"
new_content, ver_count = re.subn( new_content, ver_count = re.subn(
ver_pattern, ver_pattern,
@@ -380,12 +348,10 @@ def update_spec_version(
if ver_count == 0: if ver_count == 0:
print("[WARN] No 'Version:' line found in spec file.") print("[WARN] No 'Version:' line found in spec file.")
# Reset Release:
rel_pattern = r"^(Release:\s*)(.+)$" rel_pattern = r"^(Release:\s*)(.+)$"
def _release_repl(m: re.Match[str]) -> str: # type: ignore[name-defined] def _release_repl(m: re.Match[str]) -> str: # type: ignore[name-defined]
rest = m.group(2).strip() rest = m.group(2).strip()
# Reset numeric prefix to "1" and keep any suffix (e.g. % macros).
match = re.match(r"^(\d+)(.*)$", rest) match = re.match(r"^(\d+)(.*)$", rest)
if match: if match:
suffix = match.group(2) suffix = match.group(2)
@@ -428,21 +394,11 @@ def update_changelog(
""" """
Prepend a new release section to CHANGELOG.md with the new version, Prepend a new release section to CHANGELOG.md with the new version,
current date, and a message. current date, and a message.
Behaviour:
- If message is None and preview is False:
→ open $EDITOR (fallback 'nano') to let the user enter a message.
- If message is None and preview is True:
→ use a generic automated message.
- The resulting changelog entry is printed to stdout.
- Returns the final message text used.
""" """
today = date.today().isoformat() today = date.today().isoformat()
# Resolve message
if message is None: if message is None:
if preview: if preview:
# Do not open editor in preview mode; keep it non-interactive.
message = "Automated release." message = "Automated release."
else: else:
print( print(
@@ -470,7 +426,6 @@ def update_changelog(
new_changelog = header + "\n" + changelog if changelog else header new_changelog = header + "\n" + changelog if changelog else header
# Show the entry that will be written
print("\n================ CHANGELOG ENTRY ================") print("\n================ CHANGELOG ENTRY ================")
print(header.rstrip()) print(header.rstrip())
print("=================================================\n") print("=================================================\n")
@@ -495,8 +450,6 @@ def update_changelog(
def _get_git_config_value(key: str) -> Optional[str]: def _get_git_config_value(key: str) -> Optional[str]:
""" """
Try to read a value from `git config --get <key>`. Try to read a value from `git config --get <key>`.
Returns the stripped value or None if not set / on error.
""" """
try: try:
result = subprocess.run( result = subprocess.run(
@@ -515,12 +468,6 @@ def _get_git_config_value(key: str) -> Optional[str]:
def _get_debian_author() -> Tuple[str, str]: def _get_debian_author() -> Tuple[str, str]:
""" """
Determine the maintainer name/email for debian/changelog entries. Determine the maintainer name/email for debian/changelog entries.
Priority:
1. DEBFULLNAME / DEBEMAIL
2. GIT_AUTHOR_NAME / GIT_AUTHOR_EMAIL
3. git config user.name / user.email
4. Fallback: 'Unknown Maintainer' / 'unknown@example.com'
""" """
name = os.environ.get("DEBFULLNAME") name = os.environ.get("DEBFULLNAME")
email = os.environ.get("DEBEMAIL") email = os.environ.get("DEBEMAIL")
@@ -552,12 +499,6 @@ def update_debian_changelog(
) -> None: ) -> None:
""" """
Prepend a new entry to debian/changelog, if it exists. Prepend a new entry to debian/changelog, if it exists.
The first line typically looks like:
package-name (1.2.3-1) unstable; urgency=medium
We generate a new stanza at the top with Debian-style version
'X.Y.Z-1'. If the file does not exist, this function does nothing.
""" """
if not os.path.exists(debian_changelog_path): if not os.path.exists(debian_changelog_path):
print("[INFO] debian/changelog not found, skipping.") print("[INFO] debian/changelog not found, skipping.")
@@ -565,15 +506,12 @@ def update_debian_changelog(
debian_version = f"{new_version}-1" debian_version = f"{new_version}-1"
now = datetime.now().astimezone() now = datetime.now().astimezone()
# Debian-like date string, e.g. "Mon, 08 Dec 2025 12:34:56 +0100"
date_str = now.strftime("%a, %d %b %Y %H:%M:%S %z") date_str = now.strftime("%a, %d %b %Y %H:%M:%S %z")
author_name, author_email = _get_debian_author() author_name, author_email = _get_debian_author()
first_line = f"{package_name} ({debian_version}) unstable; urgency=medium" first_line = f"{package_name} ({debian_version}) unstable; urgency=medium"
body_line = ( body_line = message.strip() if message else f"Automated release {new_version}."
message.strip() if message else f"Automated release {new_version}."
)
stanza = ( stanza = (
f"{first_line}\n\n" f"{first_line}\n\n"
f" * {body_line}\n\n" f" * {body_line}\n\n"
@@ -613,23 +551,12 @@ def _release_impl(
release_type: str = "patch", release_type: str = "patch",
message: Optional[str] = None, message: Optional[str] = None,
preview: bool = False, preview: bool = False,
close: bool = False,
) -> None: ) -> None:
""" """
Internal implementation that performs a single-phase release. Internal implementation that performs a single-phase release.
If `preview` is True:
- No files are written.
- No git commands are executed.
- Planned actions are printed.
If `preview` is False:
- Files are updated.
- Git commit, tag, and push are executed.
""" """
# 1) Determine the current version from Git tags.
current_ver = _determine_current_version() current_ver = _determine_current_version()
# 2) Compute the next version.
new_ver = _bump_semver(current_ver, release_type) new_ver = _bump_semver(current_ver, release_type)
new_ver_str = str(new_ver) new_ver_str = str(new_ver)
new_tag = new_ver.to_tag(with_prefix=True) new_tag = new_ver.to_tag(with_prefix=True)
@@ -639,20 +566,16 @@ def _release_impl(
print(f"Current version: {current_ver}") print(f"Current version: {current_ver}")
print(f"New version: {new_ver_str} ({release_type})") print(f"New version: {new_ver_str} ({release_type})")
# Determine repository root based on pyproject location
repo_root = os.path.dirname(os.path.abspath(pyproject_path)) repo_root = os.path.dirname(os.path.abspath(pyproject_path))
# 2) Update files.
update_pyproject_version(pyproject_path, new_ver_str, preview=preview) update_pyproject_version(pyproject_path, new_ver_str, preview=preview)
# Let update_changelog resolve or edit the message; reuse it for debian. changelog_message = update_changelog(
message = update_changelog(
changelog_path, changelog_path,
new_ver_str, new_ver_str,
message=message, message=message,
preview=preview, preview=preview,
) )
# Additional packaging files (non-fatal if missing)
flake_path = os.path.join(repo_root, "flake.nix") flake_path = os.path.join(repo_root, "flake.nix")
update_flake_version(flake_path, new_ver_str, preview=preview) update_flake_version(flake_path, new_ver_str, preview=preview)
@@ -662,20 +585,23 @@ def _release_impl(
spec_path = os.path.join(repo_root, "package-manager.spec") spec_path = os.path.join(repo_root, "package-manager.spec")
update_spec_version(spec_path, new_ver_str, preview=preview) update_spec_version(spec_path, new_ver_str, preview=preview)
effective_message: Optional[str] = message
if effective_message is None and isinstance(changelog_message, str):
if changelog_message.strip():
effective_message = changelog_message.strip()
debian_changelog_path = os.path.join(repo_root, "debian", "changelog") debian_changelog_path = os.path.join(repo_root, "debian", "changelog")
# Use repo directory name as a simple default for package name
package_name = os.path.basename(repo_root) or "package-manager" package_name = os.path.basename(repo_root) or "package-manager"
update_debian_changelog( update_debian_changelog(
debian_changelog_path, debian_changelog_path,
package_name=package_name, package_name=package_name,
new_version=new_ver_str, new_version=new_ver_str,
message=message, message=effective_message,
preview=preview, preview=preview,
) )
# 3) Git operations: stage, commit, tag, push.
commit_msg = f"Release version {new_ver_str}" commit_msg = f"Release version {new_ver_str}"
tag_msg = message or commit_msg tag_msg = effective_message or commit_msg
try: try:
branch = get_current_branch() or "main" branch = get_current_branch() or "main"
@@ -683,7 +609,6 @@ def _release_impl(
branch = "main" branch = "main"
print(f"Releasing on branch: {branch}") print(f"Releasing on branch: {branch}")
# Stage all relevant packaging files so they are included in the commit
files_to_add = [ files_to_add = [
pyproject_path, pyproject_path,
changelog_path, changelog_path,
@@ -701,6 +626,18 @@ def _release_impl(
print(f'[PREVIEW] Would run: git tag -a {new_tag} -m "{tag_msg}"') print(f'[PREVIEW] Would run: git tag -a {new_tag} -m "{tag_msg}"')
print(f"[PREVIEW] Would run: git push origin {branch}") print(f"[PREVIEW] Would run: git push origin {branch}")
print("[PREVIEW] Would run: git push origin --tags") print("[PREVIEW] Would run: git push origin --tags")
if close and branch not in ("main", "master"):
print(
f"[PREVIEW] Would also close branch {branch} after the release "
"(close=True and branch is not main/master)."
)
elif close:
print(
f"[PREVIEW] close=True but current branch is {branch}; "
"no branch would be closed."
)
print("Preview completed. No changes were made.") print("Preview completed. No changes were made.")
return return
@@ -714,9 +651,26 @@ def _release_impl(
print(f"Release {new_ver_str} completed.") print(f"Release {new_ver_str} completed.")
if close:
if branch in ("main", "master"):
print(
f"[INFO] close=True but current branch is {branch}; "
"nothing to close."
)
return
print(
f"[INFO] Closing branch {branch} after successful release "
"(close=True and branch is not main/master)..."
)
try:
close_branch(name=branch, base_branch="main", cwd=".")
except Exception as exc: # pragma: no cover
print(f"[WARN] Failed to close branch {branch} automatically: {exc}")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Public release entry point (with preview-first + confirmation logic) # Public release entry point
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -727,6 +681,7 @@ def release(
message: Optional[str] = None, message: Optional[str] = None,
preview: bool = False, preview: bool = False,
force: bool = False, force: bool = False,
close: bool = False,
) -> None: ) -> None:
""" """
High-level release entry point. High-level release entry point.
@@ -735,26 +690,13 @@ def release(
- preview=True: - preview=True:
* Single-phase PREVIEW only. * Single-phase PREVIEW only.
* No files are changed, no git commands are executed.
* `force` is ignored in this mode.
- preview=False, force=True: - preview=False, force=True:
* Single-phase REAL release, no interactive preview. * Single-phase REAL release, no interactive preview.
* Files are changed and git commands are executed immediately.
- preview=False, force=False: - preview=False, force=False:
* Two-phase flow (intended default for interactive CLI use): * Two-phase flow (intended default for interactive CLI use).
1) PREVIEW: dry-run, printing all planned actions.
2) Ask the user for confirmation:
"Proceed with the actual release? [y/N]: "
If confirmed, perform the REAL release.
Otherwise, abort without changes.
* In non-interactive environments (stdin not a TTY), the
confirmation step is skipped automatically and a single
REAL phase is executed, to avoid blocking on input().
""" """
# Explicit preview mode: just do a single PREVIEW phase and exit.
if preview: if preview:
_release_impl( _release_impl(
pyproject_path=pyproject_path, pyproject_path=pyproject_path,
@@ -762,10 +704,10 @@ def release(
release_type=release_type, release_type=release_type,
message=message, message=message,
preview=True, preview=True,
close=close,
) )
return return
# Non-preview, but forced: run REAL release directly.
if force: if force:
_release_impl( _release_impl(
pyproject_path=pyproject_path, pyproject_path=pyproject_path,
@@ -773,10 +715,10 @@ def release(
release_type=release_type, release_type=release_type,
message=message, message=message,
preview=False, preview=False,
close=close,
) )
return return
# Non-interactive environment? Skip confirmation to avoid blocking.
if not sys.stdin.isatty(): if not sys.stdin.isatty():
_release_impl( _release_impl(
pyproject_path=pyproject_path, pyproject_path=pyproject_path,
@@ -784,10 +726,10 @@ def release(
release_type=release_type, release_type=release_type,
message=message, message=message,
preview=False, preview=False,
close=close,
) )
return return
# Interactive two-phase flow:
print("[INFO] Running preview before actual release...\n") print("[INFO] Running preview before actual release...\n")
_release_impl( _release_impl(
pyproject_path=pyproject_path, pyproject_path=pyproject_path,
@@ -795,9 +737,9 @@ def release(
release_type=release_type, release_type=release_type,
message=message, message=message,
preview=True, preview=True,
close=close,
) )
# Ask for confirmation
try: try:
answer = input("Proceed with the actual release? [y/N]: ").strip().lower() answer = input("Proceed with the actual release? [y/N]: ").strip().lower()
except (EOFError, KeyboardInterrupt): except (EOFError, KeyboardInterrupt):
@@ -815,68 +757,5 @@ def release(
release_type=release_type, release_type=release_type,
message=message, message=message,
preview=False, preview=False,
) close=close,
# ---------------------------------------------------------------------------
# CLI entry point for standalone use
# ---------------------------------------------------------------------------
def _parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="pkgmgr release helper")
parser.add_argument(
"release_type",
choices=["major", "minor", "patch"],
help="Type of release (major/minor/patch).",
)
parser.add_argument(
"-m",
"--message",
dest="message",
default=None,
help="Release message to use for changelog and tag.",
)
parser.add_argument(
"--pyproject",
dest="pyproject",
default="pyproject.toml",
help="Path to pyproject.toml (default: pyproject.toml)",
)
parser.add_argument(
"--changelog",
dest="changelog",
default="CHANGELOG.md",
help="Path to CHANGELOG.md (default: CHANGELOG.md)",
)
parser.add_argument(
"--preview",
action="store_true",
help=(
"Preview release changes without modifying files or running git. "
"This mode never executes the real release."
),
)
parser.add_argument(
"-f",
"--force",
dest="force",
action="store_true",
help=(
"Skip the interactive preview+confirmation step and run the "
"release directly."
),
)
return parser.parse_args(argv)
if __name__ == "__main__":
args = _parse_args()
release(
pyproject_path=args.pyproject,
changelog_path=args.changelog,
release_type=args.release_type,
message=args.message,
preview=args.preview,
force=args.force,
) )

View File

@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "package-manager" name = "package-manager"
version = "0.3.0" version = "0.4.2"
description = "Kevin's package-manager tool (pkgmgr)" description = "Kevin's package-manager tool (pkgmgr)"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"

View File

@@ -11,8 +11,8 @@ class TestIntegrationBranchCommands(unittest.TestCase):
Integration tests for the `pkgmgr branch` CLI wiring. Integration tests for the `pkgmgr branch` CLI wiring.
These tests execute the real entry point (main.py) and mock These tests execute the real entry point (main.py) and mock
the high-level `open_branch` helper to ensure that argument the high-level helpers to ensure that argument parsing and
parsing and dispatch behave as expected. dispatch behave as expected.
""" """
def _run_pkgmgr(self, extra_args: list[str]) -> None: def _run_pkgmgr(self, extra_args: list[str]) -> None:
@@ -64,6 +64,46 @@ class TestIntegrationBranchCommands(unittest.TestCase):
self.assertEqual(kwargs.get("base_branch"), "main") self.assertEqual(kwargs.get("base_branch"), "main")
self.assertEqual(kwargs.get("cwd"), ".") self.assertEqual(kwargs.get("cwd"), ".")
# ------------------------------------------------------------------
# close subcommand
# ------------------------------------------------------------------
@patch("pkgmgr.cli_core.commands.branch.close_branch")
def test_branch_close_with_name_and_base(self, mock_close_branch) -> None:
"""
`pkgmgr branch close feature/test --base develop` must forward
the name and base branch to close_branch() with cwd=".".
"""
self._run_pkgmgr(
["branch", "close", "feature/test", "--base", "develop"]
)
mock_close_branch.assert_called_once()
_, kwargs = mock_close_branch.call_args
self.assertEqual(kwargs.get("name"), "feature/test")
self.assertEqual(kwargs.get("base_branch"), "develop")
self.assertEqual(kwargs.get("cwd"), ".")
@patch("pkgmgr.cli_core.commands.branch.close_branch")
def test_branch_close_without_name_uses_default_base(
self,
mock_close_branch,
) -> None:
"""
`pkgmgr branch close` without a name must still call close_branch(),
passing name=None and the default base branch 'main'.
The branch helper will then resolve the actual base (main/master)
internally.
"""
self._run_pkgmgr(["branch", "close"])
mock_close_branch.assert_called_once()
_, kwargs = mock_close_branch.call_args
self.assertIsNone(kwargs.get("name"))
self.assertEqual(kwargs.get("base_branch"), "main")
self.assertEqual(kwargs.get("cwd"), ".")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -1,64 +0,0 @@
from __future__ import annotations
import os
import runpy
import sys
import unittest
from test_integration_version_commands import PROJECT_ROOT
class TestIntegrationListCommands(unittest.TestCase):
"""
Integration tests for `pkgmgr list` with the new selection and
description behaviour.
"""
def _run_pkgmgr(self, args: list[str], cwd: str | None = None) -> None:
cmd_repr = "pkgmgr " + " ".join(args)
original_argv = list(sys.argv)
original_cwd = os.getcwd()
try:
if cwd is not None:
os.chdir(cwd)
# Simulate: pkgmgr <args...>
sys.argv = ["pkgmgr"] + args
try:
runpy.run_module("main", run_name="__main__")
except SystemExit as exc:
code = exc.code if isinstance(exc.code, int) else str(exc.code)
if code != 0:
print()
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Working directory: {os.getcwd()}")
print(f"[TEST] Exit code : {code}")
raise AssertionError(
f"{cmd_repr!r} failed with exit code {code}. "
"Scroll up to inspect the output printed before failure."
) from exc
finally:
os.chdir(original_cwd)
sys.argv = original_argv
def test_list_all_repositories(self) -> None:
"""
`pkgmgr list --all` should successfully print the summary table.
"""
self._run_pkgmgr(["list", "--all"], cwd=PROJECT_ROOT)
def test_list_all_with_description(self) -> None:
"""
`pkgmgr list --all --description` should print the table plus the
detailed section for each repository.
"""
self._run_pkgmgr(["list", "--all", "--description"], cwd=PROJECT_ROOT)
def test_list_with_string_filter(self) -> None:
"""
`pkgmgr list --string pkgmgr` exercises the new string-based
selection logic on top of the defaults + user config.
"""
self._run_pkgmgr(["list", "--string", "pkgmgr"], cwd=PROJECT_ROOT)

View File

@@ -1,65 +0,0 @@
from __future__ import annotations
import os
import runpy
import sys
import unittest
from test_integration_version_commands import PROJECT_ROOT
class TestIntegrationProxyCommands(unittest.TestCase):
"""
Integration tests for proxy commands (e.g. git pull) using the new
selection logic and `--preview` mode so no real changes are made.
"""
def _run_pkgmgr(self, args: list[str], cwd: str | None = None) -> None:
cmd_repr = "pkgmgr " + " ".join(args)
original_argv = list(sys.argv)
original_cwd = os.getcwd()
try:
if cwd is not None:
os.chdir(cwd)
# Simulate: pkgmgr <args...>
sys.argv = ["pkgmgr"] + args
try:
runpy.run_module("main", run_name="__main__")
except SystemExit as exc:
code = exc.code if isinstance(exc.code, int) else str(exc.code)
if code != 0:
print()
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Working directory: {os.getcwd()}")
print(f"[TEST] Exit code : {code}")
raise AssertionError(
f"{cmd_repr!r} failed with exit code {code}. "
"Scroll up to inspect the output printed before failure."
) from exc
finally:
os.chdir(original_cwd)
sys.argv = original_argv
def test_git_pull_preview_for_pkgmgr(self) -> None:
"""
`pkgmgr pull --preview pkgmgr` should go through the proxy layer,
use get_selected_repos() and only print the underlying git pull
command without executing it.
"""
self._run_pkgmgr(
["pull", "--preview", "pkgmgr"],
cwd=PROJECT_ROOT,
)
def test_git_pull_preview_with_string_filter(self) -> None:
"""
`pkgmgr pull --preview --string pkgmgr` exercises the proxy +
filter-only selection path.
"""
self._run_pkgmgr(
["pull", "--preview", "--string", "pkgmgr"],
cwd=PROJECT_ROOT,
)

View File

@@ -1,99 +1,75 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
End-to-end style integration tests for the `pkgmgr release` CLI command.
These tests exercise the top-level `pkgmgr` entry point by invoking
the module as `__main__` and verifying that the underlying
`pkgmgr.release.release()` function is called with the expected
arguments, in particular the new `close` flag.
"""
from __future__ import annotations from __future__ import annotations
import os
import runpy import runpy
import sys import sys
import unittest import unittest
from unittest.mock import patch
PROJECT_ROOT = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
class TestIntegrationReleaseCommand(unittest.TestCase): class TestIntegrationReleaseCommand(unittest.TestCase):
def _run_pkgmgr( """Integration tests for `pkgmgr release` wiring."""
self,
argv: list[str],
expect_success: bool,
) -> None:
"""
Run the main entry point with the given argv and assert on success/failure.
argv must include the program name as argv[0], e.g. "": def _run_pkgmgr(self, argv: list[str]) -> None:
["", "release", "patch", "pkgmgr", "--preview"] """
Helper to invoke the `pkgmgr` console script via `run_module`.
This simulates a real CLI call like:
pkgmgr release minor --preview --close
""" """
cmd_repr = " ".join(argv[1:])
original_argv = list(sys.argv) original_argv = list(sys.argv)
try: try:
sys.argv = argv sys.argv = argv
try: # Entry point: the `pkgmgr` module is the console script.
# Execute main.py as if called via `python main.py ...` runpy.run_module("pkgmgr", run_name="__main__")
runpy.run_module("main", run_name="__main__")
except SystemExit as exc:
code = exc.code if isinstance(exc.code, int) else 1
if expect_success and code != 0:
print()
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Exit code : {code}")
raise AssertionError(
f"{cmd_repr!r} failed with exit code {code}. "
"Scroll up to inspect the output printed before failure."
) from exc
if not expect_success and code == 0:
print()
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Exit code : {code}")
raise AssertionError(
f"{cmd_repr!r} unexpectedly succeeded with exit code 0."
) from exc
else:
# No SystemExit: treat as success when expect_success is True,
# otherwise as a failure (we expected a non-zero exit).
if not expect_success:
raise AssertionError(
f"{cmd_repr!r} returned normally (expected non-zero exit)."
)
finally: finally:
sys.argv = original_argv sys.argv = original_argv
def test_release_for_unknown_repo_fails_cleanly(self) -> None: @patch("pkgmgr.release.release")
def test_release_without_close_flag(self, mock_release) -> None:
""" """
Releasing a non-existent repository identifier must fail Calling `pkgmgr release patch --preview` should *not* enable
with a non-zero exit code, but without crashing the interpreter. the `close` flag by default.
""" """
argv = [ self._run_pkgmgr(["pkgmgr", "release", "patch", "--preview"])
"",
"release",
"patch",
"does-not-exist-xyz",
]
self._run_pkgmgr(argv, expect_success=False)
def test_release_preview_for_pkgmgr_repository(self) -> None: mock_release.assert_called_once()
""" _args, kwargs = mock_release.call_args
Sanity-check the happy path for the CLI:
- Runs `pkgmgr release patch pkgmgr --preview` # CLI wiring
- Must exit with code 0 self.assertEqual(kwargs.get("release_type"), "patch")
- Uses the real configuration + repository selection self.assertTrue(kwargs.get("preview"), "preview should be True when --preview is used")
- Exercises the new --preview mode end-to-end. # Default: no --close → close=False
""" self.assertFalse(kwargs.get("close"), "close must be False when --close is not given")
argv = [
"",
"release",
"patch",
"pkgmgr",
"--preview",
]
original_cwd = os.getcwd() @patch("pkgmgr.release.release")
try: def test_release_with_close_flag(self, mock_release) -> None:
os.chdir(PROJECT_ROOT) """
self._run_pkgmgr(argv, expect_success=True) Calling `pkgmgr release minor --preview --close` should pass
finally: close=True into pkgmgr.release.release().
os.chdir(original_cwd) """
self._run_pkgmgr(["pkgmgr", "release", "minor", "--preview", "--close"])
mock_release.assert_called_once()
_args, kwargs = mock_release.call_args
# CLI wiring
self.assertEqual(kwargs.get("release_type"), "minor")
self.assertTrue(kwargs.get("preview"), "preview should be True when --preview is used")
# With --close → close=True
self.assertTrue(kwargs.get("close"), "close must be True when --close is given")
if __name__ == "__main__": if __name__ == "__main__":

View File

View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Unit tests for pkgmgr.cli_core.commands.release.
These tests focus on the wiring layer:
- Argument handling for the release command as defined by the
top-level parser (cli_core.parser.create_parser).
- Correct invocation of pkgmgr.release.release(...) for the
selected repositories.
- Behaviour of --preview, --list, --close, and -f/--force.
"""
from __future__ import annotations
from types import SimpleNamespace
from typing import List
from unittest.mock import patch, call
import argparse
import unittest
class TestReleaseCommand(unittest.TestCase):
"""
Tests for the `pkgmgr release` CLI wiring.
"""
def _make_ctx(self, all_repos: List[dict]) -> SimpleNamespace:
"""
Create a minimal CLIContext-like object for tests.
Only the attributes that handle_release() uses are provided.
"""
return SimpleNamespace(
config_merged={},
repositories_base_dir="/base/dir",
all_repositories=all_repos,
binaries_dir="/bin",
user_config_path="/tmp/config.yaml",
)
def _parse_release_args(self, argv: List[str]) -> argparse.Namespace:
"""
Build a real top-level parser and parse the given argv list
to obtain the Namespace for the `release` command.
"""
from pkgmgr.cli_core.parser import create_parser
parser = create_parser("test parser")
args = parser.parse_args(argv)
self.assertEqual(args.command, "release")
return args
@patch("pkgmgr.cli_core.commands.release.os.path.isdir", return_value=True)
@patch("pkgmgr.cli_core.commands.release.run_release")
@patch("pkgmgr.cli_core.commands.release.get_repo_dir")
@patch("pkgmgr.cli_core.commands.release.get_repo_identifier")
@patch("pkgmgr.cli_core.commands.release.os.chdir")
@patch("pkgmgr.cli_core.commands.release.os.getcwd", return_value="/cwd")
def test_release_with_close_and_message(
self,
mock_getcwd,
mock_chdir,
mock_get_repo_identifier,
mock_get_repo_dir,
mock_run_release,
mock_isdir,
) -> None:
"""
The release handler should call pkgmgr.release.release() with:
- release_type (e.g. minor)
- provided message
- preview flag
- force flag
- close flag
It must change into the repository directory and then back.
"""
from pkgmgr.cli_core.commands.release import handle_release
repo = {"name": "dummy-repo"}
selected = [repo]
ctx = self._make_ctx(selected)
mock_get_repo_identifier.return_value = "dummy-id"
mock_get_repo_dir.return_value = "/repos/dummy"
argv = [
"release",
"minor",
"dummy-id",
"-m",
"Close branch after minor release",
"--close",
"-f",
]
args = self._parse_release_args(argv)
handle_release(args, ctx, selected)
# We should have changed into the repo dir and then back.
mock_chdir.assert_has_calls(
[call("/repos/dummy"), call("/cwd")]
)
# And run_release should be invoked once with the expected parameters.
mock_run_release.assert_called_once_with(
pyproject_path="pyproject.toml",
changelog_path="CHANGELOG.md",
release_type="minor",
message="Close branch after minor release",
preview=False,
force=True,
close=True,
)
@patch("pkgmgr.cli_core.commands.release.os.path.isdir", return_value=True)
@patch("pkgmgr.cli_core.commands.release.run_release")
@patch("pkgmgr.cli_core.commands.release.get_repo_dir")
@patch("pkgmgr.cli_core.commands.release.get_repo_identifier")
@patch("pkgmgr.cli_core.commands.release.os.chdir")
@patch("pkgmgr.cli_core.commands.release.os.getcwd", return_value="/cwd")
def test_release_preview_mode(
self,
mock_getcwd,
mock_chdir,
mock_get_repo_identifier,
mock_get_repo_dir,
mock_run_release,
mock_isdir,
) -> None:
"""
In preview mode, the handler should pass preview=True to the
release helper and force=False by default.
"""
from pkgmgr.cli_core.commands.release import handle_release
repo = {"name": "dummy-repo"}
selected = [repo]
ctx = self._make_ctx(selected)
mock_get_repo_identifier.return_value = "dummy-id"
mock_get_repo_dir.return_value = "/repos/dummy"
argv = [
"release",
"patch",
"dummy-id",
"--preview",
]
args = self._parse_release_args(argv)
handle_release(args, ctx, selected)
mock_run_release.assert_called_once_with(
pyproject_path="pyproject.toml",
changelog_path="CHANGELOG.md",
release_type="patch",
message=None,
preview=True,
force=False,
close=False,
)
@patch("pkgmgr.cli_core.commands.release.run_release")
@patch("pkgmgr.cli_core.commands.release.get_repo_dir")
@patch("pkgmgr.cli_core.commands.release.get_repo_identifier")
def test_release_list_mode_does_not_invoke_helper(
self,
mock_get_repo_identifier,
mock_get_repo_dir,
mock_run_release,
) -> None:
"""
When --list is provided, the handler should print the list of affected
repositories and must NOT invoke run_release().
"""
from pkgmgr.cli_core.commands.release import handle_release
repo1 = {"name": "repo-1"}
repo2 = {"name": "repo-2"}
selected = [repo1, repo2]
ctx = self._make_ctx(selected)
mock_get_repo_identifier.side_effect = ["id-1", "id-2"]
argv = [
"release",
"major",
"--list",
]
args = self._parse_release_args(argv)
handle_release(args, ctx, selected)
mock_run_release.assert_not_called()
self.assertEqual(
mock_get_repo_identifier.call_args_list,
[call(repo1, selected), call(repo2, selected)],
)
if __name__ == "__main__":
unittest.main()

View File

@@ -73,16 +73,20 @@ class TestCliVersion(unittest.TestCase):
# Patch get_selected_repos so that 'version' operates on our temp dir. # Patch get_selected_repos so that 'version' operates on our temp dir.
# In the new modular CLI this function is used inside # In the new modular CLI this function is used inside
# pkgmgr.cli_core.dispatch, so we patch it there. # pkgmgr.cli_core.dispatch, so we patch it there.
def _fake_selected_repos(args, all_repositories): def _fake_selected_repos(
all_flag: bool,
repos: List[dict],
identifiers: List[str],
):
# We always return exactly one "repository" whose directory is the temp dir.
return [ return [
{ {
"provider": "github.com", "provider": "github.com",
"account": "test", "account": "test",
"repository": "pkgmgr-test", "repository": "pkgmgr-test",
"directory": self._tmp_dir.name, "directory": self._tmp_dir.name,
} }
] ]
self._patch_get_selected_repos = mock.patch( self._patch_get_selected_repos = mock.patch(
"pkgmgr.cli_core.dispatch.get_selected_repos", "pkgmgr.cli_core.dispatch.get_selected_repos",

View File

@@ -66,6 +66,55 @@ class TestCliBranch(unittest.TestCase):
self.assertEqual(call_kwargs.get("base_branch"), "main") self.assertEqual(call_kwargs.get("base_branch"), "main")
self.assertEqual(call_kwargs.get("cwd"), ".") self.assertEqual(call_kwargs.get("cwd"), ".")
# ------------------------------------------------------------------
# close subcommand
# ------------------------------------------------------------------
@patch("pkgmgr.cli_core.commands.branch.close_branch")
def test_handle_branch_close_forwards_args_to_close_branch(self, mock_close_branch) -> None:
"""
handle_branch('close') should call close_branch with name, base and cwd='.'.
"""
args = SimpleNamespace(
command="branch",
subcommand="close",
name="feature/cli-close",
base="develop",
)
ctx = self._dummy_ctx()
handle_branch(args, ctx)
mock_close_branch.assert_called_once()
_, call_kwargs = mock_close_branch.call_args
self.assertEqual(call_kwargs.get("name"), "feature/cli-close")
self.assertEqual(call_kwargs.get("base_branch"), "develop")
self.assertEqual(call_kwargs.get("cwd"), ".")
@patch("pkgmgr.cli_core.commands.branch.close_branch")
def test_handle_branch_close_uses_default_base_when_not_set(self, mock_close_branch) -> None:
"""
If --base is not passed for 'close', argparse gives base='main'
(default), and handle_branch should propagate that to close_branch.
"""
args = SimpleNamespace(
command="branch",
subcommand="close",
name=None,
base="main",
)
ctx = self._dummy_ctx()
handle_branch(args, ctx)
mock_close_branch.assert_called_once()
_, call_kwargs = mock_close_branch.call_args
self.assertIsNone(call_kwargs.get("name"))
self.assertEqual(call_kwargs.get("base_branch"), "main")
self.assertEqual(call_kwargs.get("cwd"), ".")
def test_handle_branch_unknown_subcommand_exits_with_code_2(self) -> None: def test_handle_branch_unknown_subcommand_exits_with_code_2(self) -> None:
""" """
Unknown branch subcommand should result in SystemExit(2). Unknown branch subcommand should result in SystemExit(2).

View File

@@ -365,6 +365,7 @@ class TestUpdateDebianChangelog(unittest.TestCase):
class TestReleaseOrchestration(unittest.TestCase): class TestReleaseOrchestration(unittest.TestCase):
@patch("pkgmgr.release.sys.stdin.isatty", return_value=False)
@patch("pkgmgr.release._run_git_command") @patch("pkgmgr.release._run_git_command")
@patch("pkgmgr.release.update_debian_changelog") @patch("pkgmgr.release.update_debian_changelog")
@patch("pkgmgr.release.update_spec_version") @patch("pkgmgr.release.update_spec_version")
@@ -387,6 +388,7 @@ class TestReleaseOrchestration(unittest.TestCase):
mock_update_spec, mock_update_spec,
mock_update_debian_changelog, mock_update_debian_changelog,
mock_run_git_command, mock_run_git_command,
mock_isatty,
) -> None: ) -> None:
mock_determine_current_version.return_value = SemVer(1, 2, 3) mock_determine_current_version.return_value = SemVer(1, 2, 3)
mock_bump_semver.return_value = SemVer(1, 2, 4) mock_bump_semver.return_value = SemVer(1, 2, 4)
@@ -449,6 +451,7 @@ class TestReleaseOrchestration(unittest.TestCase):
self.assertIn("git push origin develop", git_calls) self.assertIn("git push origin develop", git_calls)
self.assertIn("git push origin --tags", git_calls) self.assertIn("git push origin --tags", git_calls)
@patch("pkgmgr.release.sys.stdin.isatty", return_value=False)
@patch("pkgmgr.release._run_git_command") @patch("pkgmgr.release._run_git_command")
@patch("pkgmgr.release.update_debian_changelog") @patch("pkgmgr.release.update_debian_changelog")
@patch("pkgmgr.release.update_spec_version") @patch("pkgmgr.release.update_spec_version")
@@ -471,6 +474,7 @@ class TestReleaseOrchestration(unittest.TestCase):
mock_update_spec, mock_update_spec,
mock_update_debian_changelog, mock_update_debian_changelog,
mock_run_git_command, mock_run_git_command,
mock_isatty,
) -> None: ) -> None:
mock_determine_current_version.return_value = SemVer(1, 2, 3) mock_determine_current_version.return_value = SemVer(1, 2, 3)
mock_bump_semver.return_value = SemVer(1, 2, 4) mock_bump_semver.return_value = SemVer(1, 2, 4)