Compare commits

...

7 Commits

Author SHA1 Message Date
Kevin Veen-Birkenbach
7d73007181 Release version 0.5.1 2025-12-09 01:21:31 +01:00
Kevin Veen-Birkenbach
c8462fefa4 Release version 0.5.0 2025-12-09 00:44:16 +01:00
Kevin Veen-Birkenbach
00a1f373ce Merge branch 'feature/config_v2.0' 2025-12-09 00:29:19 +01:00
Kevin Veen-Birkenbach
9f9f2e68c0 Release version 0.4.3 2025-12-09 00:29:08 +01:00
Kevin Veen-Birkenbach
d25dcb05e4 Merge branch 'feature/branch_close' 2025-12-09 00:03:56 +01:00
Kevin Veen-Birkenbach
e135d39710 Release version 0.4.2 2025-12-09 00:03:46 +01:00
Kevin Veen-Birkenbach
8ea7ff23e9 Release version 0.3.0 2025-12-08 22:40:50 +01:00
28 changed files with 2175 additions and 617 deletions

View File

@@ -1,3 +1,23 @@
## [0.5.1] - 2025-12-09
* Refine pkgmgr release CLI close wiring and integration tests for --close flag (ChatGPT: https://chatgpt.com/share/69376b4e-8440-800f-9d06-535ec1d7a40e)
## [0.5.0] - 2025-12-09
* Add pkgmgr branch close subcommand, extend CLI parser wiring, and add unit tests for branch handling and version version-selection logic (see ChatGPT conversation: https://chatgpt.com/share/693762a3-9ea8-800f-a640-bc78170953d1)
## [0.4.3] - 2025-12-09
* Implement current-directory repository selection for release and proxy commands, unify selection semantics across CLI layers, extend release workflow with --close, integrate branch closing logic, fix wiring for get_repo_identifier/get_repo_dir, update packaging files (PKGBUILD, spec, flake.nix, pyproject), and add comprehensive unit/e2e tests for release and branch commands (see ChatGPT conversation: https://chatgpt.com/share/69375cfe-9e00-800f-bd65-1bd5937e1696)
## [0.4.2] - 2025-12-09
* Wire pkgmgr release CLI to new helper and add unit tests (see ChatGPT conversation: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62)
## [0.4.1] - 2025-12-08
* Add branch close subcommand and integrate release close/editor flow (ChatGPT: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62)
@@ -7,6 +27,17 @@
* Add branch closing helper and --close flag to release command, including CLI wiring and tests (see https://chatgpt.com/share/69374aec-74ec-800f-bde3-5d91dfdb9b91)
## [0.3.0] - 2025-12-08
* Massive refactor and feature expansion:
- Complete rewrite of config loading system (layered defaults + user config)
- New selection engine (--string, --category, --tag)
- Overhauled list output (colored statuses, alias highlight)
- New config update logic + default YAML sync
- Improved proxy command handling
- Full CLI routing refactor
- Expanded E2E tests for list, proxy, and selection logic
Konversation: https://chatgpt.com/share/693745c3-b8d8-800f-aa29-c8481a2ffae1
## [0.2.0] - 2025-12-08

View File

@@ -1,7 +1,7 @@
# Maintainer: Kevin Veen-Birkenbach <info@veen.world>
pkgname=package-manager
pkgver=0.4.1
pkgver=0.5.1
pkgrel=1
pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)."
arch=('any')

7
config/wip.yml Normal file
View File

@@ -0,0 +1,7 @@
- account: kevinveenbirkenbach
alias: gkfdrtdtcntr
provider: github.com
repository: federated-to-central-social-network-bridge
verified:
gpg_keys:
- 44D8F11FD62F878E

38
debian/changelog vendored
View File

@@ -1,3 +1,27 @@
package-manager (0.5.1-1) unstable; urgency=medium
* Refine pkgmgr release CLI close wiring and integration tests for --close flag (ChatGPT: https://chatgpt.com/share/69376b4e-8440-800f-9d06-535ec1d7a40e)
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 01:21:31 +0100
package-manager (0.5.0-1) unstable; urgency=medium
* Add pkgmgr branch close subcommand, extend CLI parser wiring, and add unit tests for branch handling and version version-selection logic (see ChatGPT conversation: https://chatgpt.com/share/693762a3-9ea8-800f-a640-bc78170953d1)
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 00:44:16 +0100
package-manager (0.4.3-1) unstable; urgency=medium
* Implement current-directory repository selection for release and proxy commands, unify selection semantics across CLI layers, extend release workflow with --close, integrate branch closing logic, fix wiring for get_repo_identifier/get_repo_dir, update packaging files (PKGBUILD, spec, flake.nix, pyproject), and add comprehensive unit/e2e tests for release and branch commands (see ChatGPT conversation: https://chatgpt.com/share/69375cfe-9e00-800f-bd65-1bd5937e1696)
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 00:29:08 +0100
package-manager (0.4.2-1) unstable; urgency=medium
* Wire pkgmgr release CLI to new helper and add unit tests (see ChatGPT conversation: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62)
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 00:03:46 +0100
package-manager (0.4.1-1) unstable; urgency=medium
* Add branch close subcommand and integrate release close/editor flow (ChatGPT: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62)
@@ -10,6 +34,20 @@ package-manager (0.4.0-1) unstable; urgency=medium
-- Kevin Veen-Birkenbach <kevin@veen.world> Mon, 08 Dec 2025 23:02:43 +0100
package-manager (0.3.0-1) unstable; urgency=medium
* Massive refactor and feature expansion:
- Complete rewrite of config loading system (layered defaults + user config)
- New selection engine (--string, --category, --tag)
- Overhauled list output (colored statuses, alias highlight)
- New config update logic + default YAML sync
- Improved proxy command handling
- Full CLI routing refactor
- Expanded E2E tests for list, proxy, and selection logic
Konversation: https://chatgpt.com/share/693745c3-b8d8-800f-aa29-c8481a2ffae1
-- Kevin Veen-Birkenbach <kevin@veen.world> Mon, 08 Dec 2025 22:40:49 +0100
package-manager (0.2.0-1) unstable; urgency=medium
* Add preview-first release workflow and extended packaging support (see ChatGPT conversation: https://chatgpt.com/share/693722b4-af9c-800f-bccc-8a4036e99630)

View File

@@ -31,7 +31,7 @@
rec {
pkgmgr = pyPkgs.buildPythonApplication {
pname = "package-manager";
version = "0.4.1";
version = "0.5.1";
# Use the git repo as source
src = ./.;

View File

@@ -1,5 +1,5 @@
Name: package-manager
Version: 0.4.1
Version: 0.5.1
Release: 1%{?dist}
Summary: Wrapper that runs Kevin's package-manager via Nix flake

View File

@@ -9,9 +9,9 @@ import sys
from pkgmgr.load_config import load_config
from pkgmgr.cli_core import CLIContext, create_parser, dispatch_command
# Define configuration file paths.
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
USER_CONFIG_PATH = os.path.join(PROJECT_ROOT, "config", "config.yaml")
# User config lives in the home directory:
# ~/.config/pkgmgr/config.yaml
USER_CONFIG_PATH = os.path.expanduser("~/.config/pkgmgr/config.yaml")
DESCRIPTION_TEXT = """\
\033[1;32mPackage Manager 🤖📦\033[0m
@@ -63,20 +63,31 @@ For detailed help on each command, use:
def main() -> None:
# Load merged configuration
"""
Entry point for the pkgmgr CLI.
"""
config_merged = load_config(USER_CONFIG_PATH)
repositories_base_dir = os.path.expanduser(
config_merged["directories"]["repositories"]
# Directories: be robust and provide sane defaults if missing
directories = config_merged.get("directories") or {}
repositories_dir = os.path.expanduser(
directories.get("repositories", "~/Repositories")
)
binaries_dir = os.path.expanduser(
config_merged["directories"]["binaries"]
directories.get("binaries", "~/.local/bin")
)
all_repositories = config_merged["repositories"]
# Ensure the merged config actually contains the resolved directories
config_merged.setdefault("directories", {})
config_merged["directories"]["repositories"] = repositories_dir
config_merged["directories"]["binaries"] = binaries_dir
all_repositories = config_merged.get("repositories", [])
ctx = CLIContext(
config_merged=config_merged,
repositories_base_dir=repositories_base_dir,
repositories_base_dir=repositories_dir,
all_repositories=all_repositories,
binaries_dir=binaries_dir,
user_config_path=USER_CONFIG_PATH,
@@ -85,7 +96,6 @@ def main() -> None:
parser = create_parser(DESCRIPTION_TEXT)
args = parser.parse_args()
# If no subcommand is provided, show help
if not getattr(args, "command", None):
parser.print_help()
return

View File

@@ -1,8 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
import sys
from typing import Any, Dict, List
import shutil
from pathlib import Path
from typing import Any, Dict
import yaml
@@ -17,29 +22,103 @@ from pkgmgr.run_command import run_command
def _load_user_config(user_config_path: str) -> Dict[str, Any]:
"""
Load the user config file, returning a default structure if it does not exist.
Load the user config from ~/.config/pkgmgr/config.yaml
(or whatever ctx.user_config_path is), creating the directory if needed.
"""
if os.path.exists(user_config_path):
with open(user_config_path, "r") as f:
user_config_path_expanded = os.path.expanduser(user_config_path)
cfg_dir = os.path.dirname(user_config_path_expanded)
if cfg_dir and not os.path.isdir(cfg_dir):
os.makedirs(cfg_dir, exist_ok=True)
if os.path.exists(user_config_path_expanded):
with open(user_config_path_expanded, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {"repositories": []}
return {"repositories": []}
def _find_defaults_source_dir() -> str | None:
"""
Find the directory inside the installed pkgmgr package OR the
project root that contains default config files.
Preferred locations (in dieser Reihenfolge):
- <pkg_root>/config_defaults
- <pkg_root>/config
- <project_root>/config_defaults
- <project_root>/config
"""
import pkgmgr # local import to avoid circular deps
pkg_root = Path(pkgmgr.__file__).resolve().parent
project_root = pkg_root.parent
candidates = [
pkg_root / "config_defaults",
pkg_root / "config",
project_root / "config_defaults",
project_root / "config",
]
for cand in candidates:
if cand.is_dir():
return str(cand)
return None
def _update_default_configs(user_config_path: str) -> None:
"""
Copy all default *.yml/*.yaml files from the installed pkgmgr package
into ~/.config/pkgmgr/, overwriting existing ones except the user
config file itself (config.yaml), which is never touched.
"""
source_dir = _find_defaults_source_dir()
if not source_dir:
print(
"[WARN] No config_defaults or config directory found in "
"pkgmgr installation. Nothing to update."
)
return
dest_dir = os.path.dirname(os.path.expanduser(user_config_path))
if not dest_dir:
dest_dir = os.path.expanduser("~/.config/pkgmgr")
os.makedirs(dest_dir, exist_ok=True)
for name in os.listdir(source_dir):
lower = name.lower()
if not (lower.endswith(".yml") or lower.endswith(".yaml")):
continue
if name == "config.yaml":
# Never overwrite the user config template / live config
continue
src = os.path.join(source_dir, name)
dst = os.path.join(dest_dir, name)
shutil.copy2(src, dst)
print(f"[INFO] Updated default config file: {dst}")
def handle_config(args, ctx: CLIContext) -> None:
"""
Handle the 'config' command and its subcommands.
Handle 'pkgmgr config' subcommands.
"""
user_config_path = ctx.user_config_path
# --------------------------------------------------------
# ------------------------------------------------------------
# config show
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "show":
if args.all or (not args.identifiers):
# Full merged config view
show_config([], user_config_path, full_config=True)
else:
selected = resolve_repos(args.identifiers, ctx.all_repositories)
# Show only matching entries from user config
user_config = _load_user_config(user_config_path)
selected = resolve_repos(
args.identifiers,
user_config.get("repositories", []),
)
if selected:
show_config(
selected,
@@ -48,23 +127,23 @@ def handle_config(args, ctx: CLIContext) -> None:
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# config add
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "add":
interactive_add(ctx.config_merged, user_config_path)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# config edit
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "edit":
run_command(f"nano {user_config_path}")
return
# --------------------------------------------------------
# ------------------------------------------------------------
# config init
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "init":
user_config = _load_user_config(user_config_path)
config_init(
@@ -75,14 +154,17 @@ def handle_config(args, ctx: CLIContext) -> None:
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# config delete
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "delete":
user_config = _load_user_config(user_config_path)
if args.all or not args.identifiers:
print("You must specify identifiers to delete.")
print(
"[ERROR] 'config delete' requires explicit identifiers. "
"Use 'config show' to inspect entries."
)
return
to_delete = resolve_repos(
@@ -99,14 +181,17 @@ def handle_config(args, ctx: CLIContext) -> None:
print(f"Deleted {len(to_delete)} entries from user config.")
return
# --------------------------------------------------------
# ------------------------------------------------------------
# config ignore
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "ignore":
user_config = _load_user_config(user_config_path)
if args.all or not args.identifiers:
print("You must specify identifiers to modify ignore flag.")
print(
"[ERROR] 'config ignore' requires explicit identifiers. "
"Use 'config show' to inspect entries."
)
return
to_modify = resolve_repos(
@@ -135,6 +220,21 @@ def handle_config(args, ctx: CLIContext) -> None:
save_user_config(user_config, user_config_path)
return
# If we end up here, something is wrong with subcommand routing
# ------------------------------------------------------------
# config update
# ------------------------------------------------------------
if args.subcommand == "update":
"""
Copy default YAML configs from the installed package into the
user's ~/.config/pkgmgr directory.
This will overwrite files with the same name (except config.yaml).
"""
_update_default_configs(user_config_path)
return
# ------------------------------------------------------------
# Unknown subcommand
# ------------------------------------------------------------
print(f"Unknown config subcommand: {args.subcommand}")
sys.exit(2)

View File

@@ -1,12 +1,31 @@
# pkgmgr/cli_core/commands/release.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Release command wiring for the pkgmgr CLI.
This module implements the `pkgmgr release` subcommand on top of the
generic selection logic from cli_core.dispatch. It does not define its
own subparser; the CLI surface is configured in cli_core.parser.
Responsibilities:
- Take the parsed argparse.Namespace for the `release` command.
- Use the list of selected repositories provided by dispatch_command().
- Optionally list affected repositories when --list is set.
- For each selected repository, run pkgmgr.release.release(...) in
the context of that repository directory.
"""
from __future__ import annotations
import os
import sys
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
from pkgmgr.cli_core.context import CLIContext
from pkgmgr.get_repo_dir import get_repo_dir
from pkgmgr import release as rel
from pkgmgr.get_repo_identifier import get_repo_identifier
from pkgmgr.release import release as run_release
Repository = Dict[str, Any]
@@ -18,59 +37,63 @@ def handle_release(
selected: List[Repository],
) -> None:
"""
Handle the 'release' command.
Handle the `pkgmgr release` subcommand.
Creates a release by incrementing the version and updating the changelog
in a single selected repository.
Important:
- Releases are strictly limited to exactly ONE repository.
- Using --all or specifying multiple identifiers for release does
not make sense and is therefore rejected.
- The --preview flag is respected and passed through to the release
implementation so that no changes are made in preview mode.
Flow:
1) Use the `selected` repositories as computed by dispatch_command().
2) If --list is given, print the identifiers of the selected repos
and return without running any release.
3) For each selected repository:
- Resolve its identifier and local directory.
- Change into that directory.
- Call pkgmgr.release.release(...) with the parsed options.
"""
if not selected:
print("No repositories selected for release.")
sys.exit(1)
print("[pkgmgr] No repositories selected for release.")
return
# List-only mode: show which repositories would be affected.
if getattr(args, "list", False):
print("[pkgmgr] Repositories that would be affected by this release:")
for repo in selected:
identifier = get_repo_identifier(repo, ctx.all_repositories)
print(f" - {identifier}")
return
for repo in selected:
identifier = get_repo_identifier(repo, ctx.all_repositories)
repo_dir = repo.get("directory")
if not repo_dir:
try:
repo_dir = get_repo_dir(ctx.repositories_base_dir, repo)
except Exception:
repo_dir = None
if not repo_dir or not os.path.isdir(repo_dir):
print(
f"[WARN] Skipping repository {identifier}: "
"local directory does not exist."
)
continue
if len(selected) > 1:
print(
"[ERROR] Release operations are limited to a single repository.\n"
"Do not use --all or multiple identifiers with 'pkgmgr release'."
f"[pkgmgr] Running release for repository {identifier} "
f"in '{repo_dir}'..."
)
sys.exit(1)
original_dir = os.getcwd()
repo = selected[0]
repo_dir: Optional[str] = repo.get("directory")
if not repo_dir:
repo_dir = get_repo_dir(ctx.repositories_base_dir, repo)
if not os.path.isdir(repo_dir):
print(
f"[ERROR] Repository directory does not exist locally: {repo_dir}"
)
sys.exit(1)
pyproject_path = os.path.join(repo_dir, "pyproject.toml")
changelog_path = os.path.join(repo_dir, "CHANGELOG.md")
print(
f"Releasing repository '{repo.get('repository')}' in '{repo_dir}'..."
)
os.chdir(repo_dir)
try:
rel.release(
pyproject_path=pyproject_path,
changelog_path=changelog_path,
release_type=args.release_type,
message=args.message,
preview=getattr(args, "preview", False),
)
finally:
os.chdir(original_dir)
# Change to repo directory and invoke the helper.
cwd_before = os.getcwd()
try:
os.chdir(repo_dir)
run_release(
pyproject_path="pyproject.toml",
changelog_path="CHANGELOG.md",
release_type=args.release_type,
message=args.message or None,
preview=getattr(args, "preview", False),
force=getattr(args, "force", False),
close=getattr(args, "close", False),
)
finally:
os.chdir(cwd_before)

View File

@@ -1,3 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import sys
@@ -12,7 +15,7 @@ from pkgmgr.status_repos import status_repos
from pkgmgr.list_repositories import list_repositories
from pkgmgr.run_command import run_command
from pkgmgr.create_repo import create_repo
from pkgmgr.get_selected_repos import get_selected_repos
Repository = Dict[str, Any]
@@ -23,15 +26,12 @@ def handle_repos_command(
selected: List[Repository],
) -> None:
"""
Handle repository-related commands:
- install / update / deinstall / delete / status
- path / shell
- create / list
Handle core repository commands (install/update/deinstall/delete/.../list).
"""
# --------------------------------------------------------
# install / update
# --------------------------------------------------------
# ------------------------------------------------------------
# install
# ------------------------------------------------------------
if args.command == "install":
install_repos(
selected,
@@ -46,6 +46,9 @@ def handle_repos_command(
)
return
# ------------------------------------------------------------
# update
# ------------------------------------------------------------
if args.command == "update":
update_repos(
selected,
@@ -61,9 +64,9 @@ def handle_repos_command(
)
return
# --------------------------------------------------------
# deinstall / delete
# --------------------------------------------------------
# ------------------------------------------------------------
# deinstall
# ------------------------------------------------------------
if args.command == "deinstall":
deinstall_repos(
selected,
@@ -74,6 +77,9 @@ def handle_repos_command(
)
return
# ------------------------------------------------------------
# delete
# ------------------------------------------------------------
if args.command == "delete":
delete_repos(
selected,
@@ -83,9 +89,9 @@ def handle_repos_command(
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# status
# --------------------------------------------------------
# ------------------------------------------------------------
if args.command == "status":
status_repos(
selected,
@@ -98,20 +104,20 @@ def handle_repos_command(
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# path
# --------------------------------------------------------
# ------------------------------------------------------------
if args.command == "path":
for repository in selected:
print(repository["directory"])
return
# --------------------------------------------------------
# ------------------------------------------------------------
# shell
# --------------------------------------------------------
# ------------------------------------------------------------
if args.command == "shell":
if not args.shell_command:
print("No shell command specified.")
print("[ERROR] 'shell' requires a command via -c/--command.")
sys.exit(2)
command_to_run = " ".join(args.shell_command)
for repository in selected:
@@ -125,13 +131,13 @@ def handle_repos_command(
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# create
# --------------------------------------------------------
# ------------------------------------------------------------
if args.command == "create":
if not args.identifiers:
print(
"No identifiers provided. Please specify at least one identifier "
"[ERROR] 'create' requires at least one identifier "
"in the format provider/account/repository."
)
sys.exit(1)
@@ -147,15 +153,19 @@ def handle_repos_command(
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# list
# --------------------------------------------------------
# ------------------------------------------------------------
if args.command == "list":
list_repositories(
ctx.all_repositories,
selected,
ctx.repositories_base_dir,
ctx.binaries_dir,
search_filter=args.search,
status_filter=args.status,
status_filter=getattr(args, "status", "") or "",
extra_tags=getattr(args, "tag", []) or [],
show_description=getattr(args, "description", False),
)
return
print(f"[ERROR] Unknown repos command: {args.command}")
sys.exit(2)

View File

@@ -1,11 +1,16 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
import sys
from typing import List
from typing import List, Dict, Any
from pkgmgr.cli_core.context import CLIContext
from pkgmgr.cli_core.proxy import maybe_handle_proxy
from pkgmgr.get_selected_repos import get_selected_repos
from pkgmgr.get_repo_dir import get_repo_dir
from pkgmgr.cli_core.commands import (
handle_repos_command,
@@ -19,20 +24,73 @@ from pkgmgr.cli_core.commands import (
)
def _has_explicit_selection(args) -> bool:
"""
Return True if the user explicitly selected repositories via
identifiers / --all / --category / --tag / --string.
"""
identifiers = getattr(args, "identifiers", []) or []
use_all = getattr(args, "all", False)
categories = getattr(args, "category", []) or []
tags = getattr(args, "tag", []) or []
string_filter = getattr(args, "string", "") or ""
return bool(
use_all
or identifiers
or categories
or tags
or string_filter
)
def _select_repo_for_current_directory(
ctx: CLIContext,
) -> List[Dict[str, Any]]:
"""
Heuristic: find the repository whose local directory matches the
current working directory or is the closest parent.
Example:
- Repo directory: /home/kevin/Repositories/foo
- CWD: /home/kevin/Repositories/foo/subdir
'foo' is selected.
"""
cwd = os.path.abspath(os.getcwd())
candidates: List[tuple[str, Dict[str, Any]]] = []
for repo in ctx.all_repositories:
repo_dir = repo.get("directory")
if not repo_dir:
try:
repo_dir = get_repo_dir(ctx.repositories_base_dir, repo)
except Exception:
repo_dir = None
if not repo_dir:
continue
repo_dir_abs = os.path.abspath(os.path.expanduser(repo_dir))
if cwd == repo_dir_abs or cwd.startswith(repo_dir_abs + os.sep):
candidates.append((repo_dir_abs, repo))
if not candidates:
return []
# Pick the repo with the longest (most specific) path.
candidates.sort(key=lambda item: len(item[0]), reverse=True)
return [candidates[0][1]]
def dispatch_command(args, ctx: CLIContext) -> None:
"""
Top-level command dispatcher.
Responsible for:
- computing selected repositories (where applicable)
- delegating to the correct command handler module
Dispatch the parsed arguments to the appropriate command handler.
"""
# 1) Proxy commands (git, docker, docker compose) short-circuit.
# First: proxy commands (git / docker / docker compose / make wrapper etc.)
if maybe_handle_proxy(args, ctx):
return
# 2) Determine if this command uses repository selection.
# Commands that operate on repository selections
commands_with_selection: List[str] = [
"install",
"update",
@@ -41,26 +99,33 @@ def dispatch_command(args, ctx: CLIContext) -> None:
"status",
"path",
"shell",
"code",
"explore",
"terminal",
"create",
"list",
"make",
"release",
"version",
"make",
"changelog",
# intentionally NOT "branch" it operates on cwd only
"explore",
"terminal",
"code",
]
if args.command in commands_with_selection:
selected = get_selected_repos(
getattr(args, "all", False),
ctx.all_repositories,
getattr(args, "identifiers", []),
)
if getattr(args, "command", None) in commands_with_selection:
if _has_explicit_selection(args):
# Classic selection logic (identifiers / --all / filters)
selected = get_selected_repos(args, ctx.all_repositories)
else:
# Default per help text: repository of current folder.
selected = _select_repo_for_current_directory(ctx)
# If none is found, leave 'selected' empty.
# Individual handlers will then emit a clear message instead
# of silently picking an unrelated repository.
else:
selected = []
# 3) Delegate based on command.
# ------------------------------------------------------------------ #
# Repos-related commands
# ------------------------------------------------------------------ #
if args.command in (
"install",
"update",
@@ -73,22 +138,41 @@ def dispatch_command(args, ctx: CLIContext) -> None:
"list",
):
handle_repos_command(args, ctx, selected)
elif args.command in ("code", "explore", "terminal"):
return
# ------------------------------------------------------------------ #
# Tools (explore / terminal / code)
# ------------------------------------------------------------------ #
if args.command in ("explore", "terminal", "code"):
handle_tools_command(args, ctx, selected)
elif args.command == "release":
return
# ------------------------------------------------------------------ #
# Release / Version / Changelog / Config / Make / Branch
# ------------------------------------------------------------------ #
if args.command == "release":
handle_release(args, ctx, selected)
elif args.command == "version":
return
if args.command == "version":
handle_version(args, ctx, selected)
elif args.command == "changelog":
return
if args.command == "changelog":
handle_changelog(args, ctx, selected)
elif args.command == "config":
return
if args.command == "config":
handle_config(args, ctx)
elif args.command == "make":
return
if args.command == "make":
handle_make(args, ctx, selected)
elif args.command == "branch":
# Branch commands currently operate on the current working
# directory only, not on the pkgmgr repository selection.
return
if args.command == "branch":
handle_branch(args, ctx)
else:
print(f"Unknown command: {args.command}")
sys.exit(2)
return
print(f"Unknown command: {args.command}")
sys.exit(2)

View File

@@ -1,3 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
@@ -12,13 +15,20 @@ class SortedSubParsersAction(argparse._SubParsersAction):
def add_parser(self, name, **kwargs):
parser = super().add_parser(name, **kwargs)
# Sort choices alphabetically by dest (subcommand name)
self._choices_actions.sort(key=lambda a: a.dest)
return parser
def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
"""
Attach generic repository selection arguments to a subparser.
Common identifier / selection arguments for many subcommands.
Selection modes (mutual intent, not hard-enforced):
- identifiers (positional): select by alias / provider/account/repo
- --all: select all repositories
- --category / --string / --tag: filter-based selection on top
of the full repository set
"""
subparser.add_argument(
"identifiers",
@@ -39,6 +49,33 @@ def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
"yes | pkgmgr {subcommand} --all"
),
)
subparser.add_argument(
"--category",
nargs="+",
default=[],
help=(
"Filter repositories by category patterns derived from config "
"filenames or repo metadata (use filename without .yml/.yaml, "
"or /regex/ to use a regular expression)."
),
)
subparser.add_argument(
"--string",
default="",
help=(
"Filter repositories whose identifier / name / path contains this "
"substring (case-insensitive). Use /regex/ for regular expressions."
),
)
subparser.add_argument(
"--tag",
action="append",
default=[],
help=(
"Filter repositories by tag. Matches tags from the repository "
"collector and category tags. Use /regex/ for regular expressions."
),
)
subparser.add_argument(
"--preview",
action="store_true",
@@ -61,7 +98,7 @@ def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None:
"""
Attach shared flags for install/update-like commands.
Common arguments for install/update commands.
"""
add_identifier_arguments(subparser)
subparser.add_argument(
@@ -94,10 +131,7 @@ def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None:
def create_parser(description_text: str) -> argparse.ArgumentParser:
"""
Create and configure the top-level argument parser for pkgmgr.
This function defines *only* the CLI surface (arguments & subcommands),
but no business logic.
Create the top-level argument parser for pkgmgr.
"""
parser = argparse.ArgumentParser(
description=description_text,
@@ -110,7 +144,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
)
# ------------------------------------------------------------
# install / update
# install / update / deinstall / delete
# ------------------------------------------------------------
install_parser = subparsers.add_parser(
"install",
@@ -129,9 +163,6 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
help="Include system update commands",
)
# ------------------------------------------------------------
# deinstall / delete
# ------------------------------------------------------------
deinstall_parser = subparsers.add_parser(
"deinstall",
help="Remove alias links to repository/repositories",
@@ -147,7 +178,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
# ------------------------------------------------------------
# create
# ------------------------------------------------------------
create_parser = subparsers.add_parser(
create_cmd_parser = subparsers.add_parser(
"create",
help=(
"Create new repository entries: add them to the config if not "
@@ -155,8 +186,8 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"remotely if --remote is set."
),
)
add_identifier_arguments(create_parser)
create_parser.add_argument(
add_identifier_arguments(create_cmd_parser)
create_cmd_parser.add_argument(
"--remote",
action="store_true",
help="If set, add the remote and push the initial commit.",
@@ -228,6 +259,14 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
help="Set ignore to true or false",
)
config_subparsers.add_parser(
"update",
help=(
"Update default config files in ~/.config/pkgmgr/ from the "
"installed pkgmgr package (does not touch config.yaml)."
),
)
# ------------------------------------------------------------
# path / explore / terminal / code / shell
# ------------------------------------------------------------
@@ -265,7 +304,10 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"--command",
nargs=argparse.REMAINDER,
dest="shell_command",
help="The shell command (and its arguments) to execute in each repository",
help=(
"The shell command (and its arguments) to execute in each "
"repository"
),
default=[],
)
@@ -274,7 +316,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
# ------------------------------------------------------------
branch_parser = subparsers.add_parser(
"branch",
help="Branch-related utilities (e.g. open feature branches)",
help="Branch-related utilities (e.g. open/close feature branches)",
)
branch_subparsers = branch_parser.add_subparsers(
dest="subcommand",
@@ -289,7 +331,10 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
branch_open.add_argument(
"name",
nargs="?",
help="Name of the new branch (optional; will be asked interactively if omitted)",
help=(
"Name of the new branch (optional; will be asked interactively "
"if omitted)"
),
)
branch_open.add_argument(
"--base",
@@ -297,6 +342,26 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
help="Base branch to create the new branch from (default: main)",
)
branch_close = branch_subparsers.add_parser(
"close",
help="Merge a feature branch into base and delete it",
)
branch_close.add_argument(
"name",
nargs="?",
help=(
"Name of the branch to close (optional; current branch is used "
"if omitted)"
),
)
branch_close.add_argument(
"--base",
default="main",
help=(
"Base branch to merge into (default: main; falls back to master "
"internally if main does not exist)"
),
)
# ------------------------------------------------------------
# release
@@ -316,12 +381,32 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
release_parser.add_argument(
"-m",
"--message",
default="",
default=None,
help=(
"Optional release message to add to the changelog and tag."
),
)
# Generic selection / preview / list / extra_args
add_identifier_arguments(release_parser)
# Close current branch after successful release
release_parser.add_argument(
"--close",
action="store_true",
help=(
"Close the current branch after a successful release in each "
"repository, if it is not main/master."
),
)
# Force: skip preview+confirmation and run release directly
release_parser.add_argument(
"-f",
"--force",
action="store_true",
help=(
"Skip the interactive preview+confirmation step and run the "
"release directly."
),
)
# ------------------------------------------------------------
# version
@@ -330,7 +415,8 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"version",
help=(
"Show version information for repository/ies "
"(git tags, pyproject.toml, flake.nix, PKGBUILD, debian, spec, Ansible Galaxy)."
"(git tags, pyproject.toml, flake.nix, PKGBUILD, debian, spec, "
"Ansible Galaxy)."
),
)
add_identifier_arguments(version_parser)
@@ -364,20 +450,29 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"list",
help="List all repositories with details and status",
)
list_parser.add_argument(
"--search",
default="",
help="Filter repositories that contain the given string",
)
# dieselbe Selektionslogik wie bei install/update/etc.:
add_identifier_arguments(list_parser)
list_parser.add_argument(
"--status",
type=str,
default="",
help="Filter repositories by status (case insensitive)",
help=(
"Filter repositories by status (case insensitive). "
"Use /regex/ for regular expressions."
),
)
list_parser.add_argument(
"--description",
action="store_true",
help=(
"Show an additional detailed section per repository "
"(description, homepage, tags, categories, paths)."
),
)
# ------------------------------------------------------------
# make (wrapper around make in repositories)
# make
# ------------------------------------------------------------
make_parser = subparsers.add_parser(
"make",
@@ -403,7 +498,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
add_identifier_arguments(make_deinstall)
# ------------------------------------------------------------
# Proxy commands (git, docker, docker compose)
# Proxy commands (git, docker, docker compose, ...)
# ------------------------------------------------------------
register_proxy_commands(subparsers)

View File

@@ -1,14 +1,19 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
import os
import sys
from typing import Dict, List
from typing import Dict, List, Any
from pkgmgr.cli_core.context import CLIContext
from pkgmgr.clone_repos import clone_repos
from pkgmgr.exec_proxy_command import exec_proxy_command
from pkgmgr.get_selected_repos import get_selected_repos
from pkgmgr.pull_with_verification import pull_with_verification
from pkgmgr.get_selected_repos import get_selected_repos
from pkgmgr.get_repo_dir import get_repo_dir
PROXY_COMMANDS: Dict[str, List[str]] = {
@@ -42,10 +47,7 @@ PROXY_COMMANDS: Dict[str, List[str]] = {
def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None:
"""
Local copy of the identifier argument set for proxy commands.
This duplicates the semantics of cli.parser.add_identifier_arguments
to avoid circular imports.
Selection arguments for proxy subcommands.
"""
parser.add_argument(
"identifiers",
@@ -66,6 +68,24 @@ def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None:
"yes | pkgmgr {subcommand} --all"
),
)
parser.add_argument(
"--category",
nargs="+",
default=[],
help=(
"Filter repositories by category patterns derived from config "
"filenames or repo metadata (use filename without .yml/.yaml, "
"or /regex/ to use a regular expression)."
),
)
parser.add_argument(
"--string",
default="",
help=(
"Filter repositories whose identifier / name / path contains this "
"substring (case-insensitive). Use /regex/ for regular expressions."
),
)
parser.add_argument(
"--preview",
action="store_true",
@@ -86,12 +106,62 @@ def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None:
)
def _proxy_has_explicit_selection(args: argparse.Namespace) -> bool:
"""
Same semantics as in the main dispatch:
True if the user explicitly selected repositories.
"""
identifiers = getattr(args, "identifiers", []) or []
use_all = getattr(args, "all", False)
categories = getattr(args, "category", []) or []
string_filter = getattr(args, "string", "") or ""
# Proxy commands currently do not support --tag, so it is not checked here.
return bool(
use_all
or identifiers
or categories
or string_filter
)
def _select_repo_for_current_directory(
ctx: CLIContext,
) -> List[Dict[str, Any]]:
"""
Heuristic: find the repository whose local directory matches the
current working directory or is the closest parent.
"""
cwd = os.path.abspath(os.getcwd())
candidates: List[tuple[str, Dict[str, Any]]] = []
for repo in ctx.all_repositories:
repo_dir = repo.get("directory")
if not repo_dir:
try:
repo_dir = get_repo_dir(ctx.repositories_base_dir, repo)
except Exception:
repo_dir = None
if not repo_dir:
continue
repo_dir_abs = os.path.abspath(os.path.expanduser(repo_dir))
if cwd == repo_dir_abs or cwd.startswith(repo_dir_abs + os.sep):
candidates.append((repo_dir_abs, repo))
if not candidates:
return []
# Pick the repo with the longest (most specific) path.
candidates.sort(key=lambda item: len(item[0]), reverse=True)
return [candidates[0][1]]
def register_proxy_commands(
subparsers: argparse._SubParsersAction,
) -> None:
"""
Register proxy commands (git, docker, docker compose) as
top-level subcommands on the given subparsers.
Register proxy subcommands for git, docker, docker compose, ...
"""
for command, subcommands in PROXY_COMMANDS.items():
for subcommand in subcommands:
@@ -100,7 +170,8 @@ def register_proxy_commands(
help=f"Proxies '{command} {subcommand}' to repository/ies",
description=(
f"Executes '{command} {subcommand}' for the "
"identified repos.\nTo recieve more help execute "
"selected repositories. "
"For more details see the underlying tool's help: "
f"'{command} {subcommand} --help'"
),
formatter_class=argparse.RawTextHelpFormatter,
@@ -129,8 +200,8 @@ def register_proxy_commands(
def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool:
"""
If the parsed command is a proxy command, execute it and return True.
Otherwise return False to let the main dispatcher continue.
If the top-level command is one of the proxy subcommands
(git / docker / docker compose), handle it here and return True.
"""
all_proxy_subcommands = {
sub for subs in PROXY_COMMANDS.values() for sub in subs
@@ -139,12 +210,17 @@ def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool:
if args.command not in all_proxy_subcommands:
return False
# Use generic selection semantics for proxies
selected = get_selected_repos(
getattr(args, "all", False),
ctx.all_repositories,
getattr(args, "identifiers", []),
)
# Default semantics: without explicit selection → repo of current folder.
if _proxy_has_explicit_selection(args):
selected = get_selected_repos(args, ctx.all_repositories)
else:
selected = _select_repo_for_current_directory(ctx)
if not selected:
print(
"[ERROR] No repository matches the current directory. "
"Specify identifiers or use --all/--category/--string."
)
sys.exit(1)
for command, subcommands in PROXY_COMMANDS.items():
if args.command not in subcommands:

View File

@@ -1,46 +1,122 @@
import subprocess
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Initialize user configuration by scanning the repositories base directory.
This module scans the path:
defaults_config["directories"]["repositories"]
with the expected structure:
{base}/{provider}/{account}/{repository}
For each discovered repository, the function:
• derives provider, account, repository from the folder structure
• (optionally) determines the latest commit hash via git log
• generates a unique CLI alias
• marks ignore=True for newly discovered repos
• skips repos already known in defaults or user config
"""
from __future__ import annotations
import os
import subprocess
from typing import Any, Dict
from pkgmgr.generate_alias import generate_alias
from pkgmgr.save_user_config import save_user_config
def config_init(user_config, defaults_config, bin_dir,USER_CONFIG_PATH:str):
def config_init(
user_config: Dict[str, Any],
defaults_config: Dict[str, Any],
bin_dir: str,
user_config_path: str,
) -> None:
"""
Scan the base directory (defaults_config["base"]) for repositories.
The folder structure is assumed to be:
{base}/{provider}/{account}/{repository}
For each repository found, automatically determine:
- provider, account, repository from folder names.
- verified: the latest commit (via 'git log -1 --format=%H').
- alias: generated from the repository name using generate_alias().
Repositories already defined in defaults_config["repositories"] or user_config["repositories"] are skipped.
Scan the repositories base directory and add missing entries
to the user configuration.
"""
repositories_base_dir = os.path.expanduser(defaults_config["directories"]["repositories"])
# ------------------------------------------------------------
# Announce where we will write the result
# ------------------------------------------------------------
print("============================================================")
print(f"[INIT] Writing user configuration to:")
print(f" {user_config_path}")
print("============================================================")
repositories_base_dir = os.path.expanduser(
defaults_config["directories"]["repositories"]
)
print(f"[INIT] Scanning repository base directory:")
print(f" {repositories_base_dir}")
print("")
if not os.path.isdir(repositories_base_dir):
print(f"Base directory '{repositories_base_dir}' does not exist.")
print(f"[ERROR] Base directory does not exist: {repositories_base_dir}")
return
default_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in defaults_config.get("repositories", [])}
existing_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in user_config.get("repositories", [])}
existing_aliases = {entry.get("alias") for entry in user_config.get("repositories", []) if entry.get("alias")}
default_keys = {
(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in defaults_config.get("repositories", [])
}
existing_keys = {
(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in user_config.get("repositories", [])
}
existing_aliases = {
entry.get("alias")
for entry in user_config.get("repositories", [])
if entry.get("alias")
}
new_entries = []
scanned = 0
skipped = 0
# ------------------------------------------------------------
# Actual scanning
# ------------------------------------------------------------
for provider in os.listdir(repositories_base_dir):
provider_path = os.path.join(repositories_base_dir, provider)
if not os.path.isdir(provider_path):
continue
print(f"[SCAN] Provider: {provider}")
for account in os.listdir(provider_path):
account_path = os.path.join(provider_path, account)
if not os.path.isdir(account_path):
continue
print(f"[SCAN] Account: {account}")
for repo_name in os.listdir(account_path):
repo_path = os.path.join(account_path, repo_name)
if not os.path.isdir(repo_path):
continue
scanned += 1
key = (provider, account, repo_name)
if key in default_keys or key in existing_keys:
# Already known?
if key in default_keys:
skipped += 1
print(f"[SKIP] (defaults) {provider}/{account}/{repo_name}")
continue
if key in existing_keys:
skipped += 1
print(f"[SKIP] (user-config) {provider}/{account}/{repo_name}")
continue
print(f"[ADD] {provider}/{account}/{repo_name}")
# Determine commit hash
try:
result = subprocess.run(
["git", "log", "-1", "--format=%H"],
@@ -51,25 +127,55 @@ def config_init(user_config, defaults_config, bin_dir,USER_CONFIG_PATH:str):
check=True,
)
verified = result.stdout.strip()
except Exception as e:
print(f"[INFO] Latest commit: {verified}")
except Exception as exc:
verified = ""
print(f"Could not determine latest commit for {repo_name} ({provider}/{account}): {e}")
print(f"[WARN] Could not read commit: {exc}")
entry = {
"provider": provider,
"account": account,
"repository": repo_name,
"verified": {"commit": verified},
"ignore": True
"ignore": True,
}
alias = generate_alias({"repository": repo_name, "provider": provider, "account": account}, bin_dir, existing_aliases)
# Alias generation
alias = generate_alias(
{
"repository": repo_name,
"provider": provider,
"account": account,
},
bin_dir,
existing_aliases,
)
entry["alias"] = alias
existing_aliases.add(alias)
new_entries.append(entry)
print(f"Adding new repo entry: {entry}")
print(f"[INFO] Alias generated: {alias}")
new_entries.append(entry)
print("") # blank line between accounts
# ------------------------------------------------------------
# Summary
# ------------------------------------------------------------
print("============================================================")
print(f"[DONE] Scanned repositories: {scanned}")
print(f"[DONE] Skipped (known): {skipped}")
print(f"[DONE] New entries discovered: {len(new_entries)}")
print("============================================================")
# ------------------------------------------------------------
# Save if needed
# ------------------------------------------------------------
if new_entries:
user_config.setdefault("repositories", []).extend(new_entries)
save_user_config(user_config,USER_CONFIG_PATH)
save_user_config(user_config, user_config_path)
print(f"[SAVE] Wrote user configuration to:")
print(f" {user_config_path}")
else:
print("No new repositories found.")
print("[INFO] No new repositories were added.")
print("============================================================")

View File

@@ -1,29 +1,170 @@
import os
import sys
from .resolve_repos import resolve_repos
from .filter_ignored import filter_ignored
from .get_repo_dir import get_repo_dir
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
import re
from typing import Any, Dict, List, Sequence
from pkgmgr.resolve_repos import resolve_repos
Repository = Dict[str, Any]
def _compile_maybe_regex(pattern: str):
"""
If pattern is of the form /.../, return a compiled regex (case-insensitive).
Otherwise return None.
"""
if len(pattern) >= 2 and pattern.startswith("/") and pattern.endswith("/"):
try:
return re.compile(pattern[1:-1], re.IGNORECASE)
except re.error:
return None
return None
def _match_pattern(value: str, pattern: str) -> bool:
"""
Match a value against a pattern that may be a substring or /regex/.
"""
if not pattern:
return True
regex = _compile_maybe_regex(pattern)
if regex:
return bool(regex.search(value))
return pattern.lower() in value.lower()
def _match_any(values: Sequence[str], pattern: str) -> bool:
"""
Return True if any of the values matches the pattern.
"""
for v in values:
if _match_pattern(v, pattern):
return True
return False
def _build_identifier_string(repo: Repository) -> str:
"""
Build a combined identifier string for string-based filtering.
"""
provider = str(repo.get("provider", ""))
account = str(repo.get("account", ""))
repository = str(repo.get("repository", ""))
alias = str(repo.get("alias", ""))
description = str(repo.get("description", ""))
directory = str(repo.get("directory", ""))
parts = [
provider,
account,
repository,
alias,
f"{provider}/{account}/{repository}",
description,
directory,
]
return " ".join(p for p in parts if p)
def _apply_filters(
repos: List[Repository],
string_pattern: str,
category_patterns: List[str],
tag_patterns: List[str],
) -> List[Repository]:
if not string_pattern and not category_patterns and not tag_patterns:
return repos
filtered: List[Repository] = []
for repo in repos:
# String filter
if string_pattern:
ident_str = _build_identifier_string(repo)
if not _match_pattern(ident_str, string_pattern):
continue
# Category filter: nur echte Kategorien, KEINE Tags
if category_patterns:
cats: List[str] = []
cats.extend(map(str, repo.get("category_files", [])))
if "category" in repo:
cats.append(str(repo["category"]))
if not cats:
continue
ok = True
for pat in category_patterns:
if not _match_any(cats, pat):
ok = False
break
if not ok:
continue
# Tag filter: ausschließlich YAML-Tags
if tag_patterns:
tags: List[str] = list(map(str, repo.get("tags", [])))
if not tags:
continue
ok = True
for pat in tag_patterns:
if not _match_any(tags, pat):
ok = False
break
if not ok:
continue
filtered.append(repo)
def get_selected_repos(show_all: bool, all_repos_list, identifiers=None):
if show_all:
selected = all_repos_list
else:
selected = resolve_repos(identifiers, all_repos_list)
# If no repositories were found using the provided identifiers,
# try to automatically select based on the current directory:
if not selected:
current_dir = os.getcwd()
directory_name = os.path.basename(current_dir)
# Pack the directory name in a list since resolve_repos expects a list.
auto_selected = resolve_repos([directory_name], all_repos_list)
if auto_selected:
# Check if the path of the first auto-selected repository matches the current directory.
if os.path.abspath(auto_selected[0].get("directory")) == os.path.abspath(current_dir):
print(f"Repository {auto_selected[0]['repository']} has been auto-selected by path.")
selected = auto_selected
filtered = filter_ignored(selected)
if not filtered:
print("Error: No repositories had been selected.")
sys.exit(4)
return filtered
def get_selected_repos(args, all_repositories: List[Repository]) -> List[Repository]:
"""
Compute the list of repositories selected by CLI arguments.
Modes:
- If identifiers are given: select via resolve_repos() from all_repositories.
- Else if any of --category/--string/--tag is used: start from all_repositories
and apply filters.
- Else if --all is set: select all_repositories.
- Else: try to select the repository of the current working directory.
"""
identifiers: List[str] = getattr(args, "identifiers", []) or []
use_all: bool = bool(getattr(args, "all", False))
category_patterns: List[str] = getattr(args, "category", []) or []
string_pattern: str = getattr(args, "string", "") or ""
tag_patterns: List[str] = getattr(args, "tag", []) or []
has_filters = bool(category_patterns or string_pattern or tag_patterns)
# 1) Explicit identifiers win
if identifiers:
base = resolve_repos(identifiers, all_repositories)
return _apply_filters(base, string_pattern, category_patterns, tag_patterns)
# 2) Filter-only mode: start from all repositories
if has_filters:
return _apply_filters(list(all_repositories), string_pattern, category_patterns, tag_patterns)
# 3) --all (no filters): all repos
if use_all:
return list(all_repositories)
# 4) Fallback: try to select repository of current working directory
cwd = os.path.abspath(os.getcwd())
by_dir = [
repo
for repo in all_repositories
if os.path.abspath(str(repo.get("directory", ""))) == cwd
]
if by_dir:
return by_dir
# No specific match -> empty list
return []

View File

@@ -1,108 +1,352 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Pretty-print repository list with status, categories, tags and path.
- Tags come exclusively from YAML: repo["tags"].
- Categories come from repo["category_files"] (YAML file names without
.yml/.yaml) and optional repo["category"].
- Optional detail mode (--description) prints an extended section per
repository with description, homepage, etc.
"""
from __future__ import annotations
import os
from pkgmgr.get_repo_identifier import get_repo_identifier
from pkgmgr.get_repo_dir import get_repo_dir
import re
from textwrap import wrap
from typing import Any, Dict, List, Optional
def list_repositories(all_repos, repositories_base_dir, bin_dir, search_filter="", status_filter=""):
Repository = Dict[str, Any]
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
MAGENTA = "\033[35m"
GREY = "\033[90m"
def _compile_maybe_regex(pattern: str) -> Optional[re.Pattern[str]]:
"""
Lists all repositories with their attributes and status information.
The repositories are sorted in ascending order by their identifier.
Parameters:
all_repos (list): List of repository configurations.
repositories_base_dir (str): The base directory where repositories are located.
bin_dir (str): The directory where executable wrappers are stored.
search_filter (str): Filter for repository attributes (case insensitive).
status_filter (str): Filter for computed status info (case insensitive).
For each repository, the identifier is printed in bold, the description (if available)
in italic, then all other attributes and computed status are printed.
If the repository is installed, a hint is displayed under the attributes.
Repositories are filtered out if either the search_filter is not found in any attribute or
if the status_filter is not found in the computed status string.
If pattern is of the form /.../, return a compiled regex (case-insensitive).
Otherwise return None.
"""
search_filter = search_filter.lower() if search_filter else ""
status_filter = status_filter.lower() if status_filter else ""
if not pattern:
return None
if len(pattern) >= 2 and pattern.startswith("/") and pattern.endswith("/"):
try:
return re.compile(pattern[1:-1], re.IGNORECASE)
except re.error:
return None
return None
# Define status colors using colors not used for other attributes:
# Avoid red (for ignore), blue (for homepage) and yellow (for verified).
status_colors = {
"Installed": "\033[1;32m", # Green
"Not Installed": "\033[1;35m", # Magenta
"Cloned": "\033[1;36m", # Cyan
"Clonable": "\033[1;37m", # White
"Ignored": "\033[38;5;208m", # Orange (extended)
"Active": "\033[38;5;129m", # Light Purple (extended)
"Installable": "\033[38;5;82m" # Light Green (extended)
}
# Sort all repositories by their identifier in ascending order.
sorted_repos = sorted(all_repos, key=lambda repo: get_repo_identifier(repo, all_repos))
def _status_matches(status: str, status_filter: str) -> bool:
"""
Match a status string against an optional filter (substring or /regex/).
"""
if not status_filter:
return True
for repo in sorted_repos:
# Combine all attribute values into one string for filtering.
repo_text = " ".join(str(v) for v in repo.values()).lower()
if search_filter and search_filter not in repo_text:
regex = _compile_maybe_regex(status_filter)
if regex:
return bool(regex.search(status))
return status_filter.lower() in status.lower()
def _compute_repo_dir(repositories_base_dir: str, repo: Repository) -> str:
"""
Compute the local directory for a repository.
If the repository already has a 'directory' key, that is used;
otherwise the path is constructed from provider/account/repository
under repositories_base_dir.
"""
if repo.get("directory"):
return os.path.expanduser(str(repo["directory"]))
provider = str(repo.get("provider", ""))
account = str(repo.get("account", ""))
repository = str(repo.get("repository", ""))
return os.path.join(
os.path.expanduser(repositories_base_dir),
provider,
account,
repository,
)
def _compute_status(
repo: Repository,
repo_dir: str,
binaries_dir: str,
) -> str:
"""
Compute a human-readable status string, e.g. 'present,alias,ignored'.
"""
parts: List[str] = []
exists = os.path.isdir(repo_dir)
if exists:
parts.append("present")
else:
parts.append("absent")
alias = repo.get("alias")
if alias:
alias_path = os.path.join(os.path.expanduser(binaries_dir), str(alias))
if os.path.exists(alias_path):
parts.append("alias")
else:
parts.append("alias-missing")
if repo.get("ignore"):
parts.append("ignored")
return ",".join(parts) if parts else "-"
def _color_status(status_padded: str) -> str:
"""
Color individual status flags inside a padded status string.
Input is expected to be right-padded to the column width.
Color mapping:
- present -> green
- absent -> red
- alias -> red
- alias-missing -> red
- ignored -> magenta
- other -> default
"""
core = status_padded.rstrip()
pad_spaces = len(status_padded) - len(core)
plain_parts = core.split(",") if core else []
colored_parts: List[str] = []
for raw_part in plain_parts:
name = raw_part.strip()
if not name:
continue
# Compute status information for the repository.
identifier = get_repo_identifier(repo, all_repos)
executable_path = os.path.join(bin_dir, identifier)
repo_dir = get_repo_dir(repositories_base_dir, repo)
status_list = []
# Check if the executable exists (Installed).
if os.path.exists(executable_path):
status_list.append("Installed")
if name == "present":
color = GREEN
elif name == "absent":
color = MAGENTA
elif name in ("alias", "alias-missing"):
color = YELLOW
elif name == "ignored":
color = MAGENTA
else:
status_list.append("Not Installed")
# Check if the repository directory exists (Cloned).
if os.path.exists(repo_dir):
status_list.append("Cloned")
else:
status_list.append("Clonable")
# Mark ignored repositories.
if repo.get("ignore", False):
status_list.append("Ignored")
else:
status_list.append("Active")
# Define installable as cloned but not installed.
if os.path.exists(repo_dir) and not os.path.exists(executable_path):
status_list.append("Installable")
color = ""
# Build a colored status string.
colored_statuses = [f"{status_colors.get(s, '')}{s}\033[0m" for s in status_list]
status_str = ", ".join(colored_statuses)
if color:
colored_parts.append(f"{color}{name}{RESET}")
else:
colored_parts.append(name)
# If a status_filter is provided, only display repos whose status contains the filter.
if status_filter and status_filter not in status_str.lower():
colored_core = ",".join(colored_parts)
return colored_core + (" " * pad_spaces)
def list_repositories(
repositories: List[Repository],
repositories_base_dir: str,
binaries_dir: str,
search_filter: str = "",
status_filter: str = "",
extra_tags: Optional[List[str]] = None,
show_description: bool = False,
) -> None:
"""
Print a table of repositories and (optionally) detailed descriptions.
Parameters
----------
repositories:
Repositories to show (usually already filtered by get_selected_repos).
repositories_base_dir:
Base directory where repositories live.
binaries_dir:
Directory where alias symlinks live.
search_filter:
Optional substring/regex filter on identifier and metadata.
status_filter:
Optional filter on computed status.
extra_tags:
Additional tags to show for each repository (CLI overlay only).
show_description:
If True, print a detailed block for each repository after the table.
"""
if extra_tags is None:
extra_tags = []
search_regex = _compile_maybe_regex(search_filter)
rows: List[Dict[str, Any]] = []
# ------------------------------------------------------------------
# Build rows
# ------------------------------------------------------------------
for repo in repositories:
identifier = str(repo.get("repository") or repo.get("alias") or "")
alias = str(repo.get("alias") or "")
provider = str(repo.get("provider") or "")
account = str(repo.get("account") or "")
description = str(repo.get("description") or "")
homepage = str(repo.get("homepage") or "")
repo_dir = _compute_repo_dir(repositories_base_dir, repo)
status = _compute_status(repo, repo_dir, binaries_dir)
if not _status_matches(status, status_filter):
continue
# Display repository details:
# Print the identifier in bold.
print(f"\033[1m{identifier}\033[0m")
# Print the description in italic if it exists.
description = repo.get("description")
if search_filter:
haystack = " ".join(
[
identifier,
alias,
provider,
account,
description,
homepage,
repo_dir,
]
)
if search_regex:
if not search_regex.search(haystack):
continue
else:
if search_filter.lower() not in haystack.lower():
continue
categories: List[str] = []
categories.extend(map(str, repo.get("category_files", [])))
if repo.get("category"):
categories.append(str(repo["category"]))
yaml_tags: List[str] = list(map(str, repo.get("tags", [])))
display_tags: List[str] = sorted(
set(yaml_tags + list(map(str, extra_tags)))
)
rows.append(
{
"repo": repo,
"identifier": identifier,
"status": status,
"categories": categories,
"tags": display_tags,
"dir": repo_dir,
}
)
if not rows:
print("No repositories matched the given filters.")
return
# ------------------------------------------------------------------
# Table section (header grey, values white, per-flag colored status)
# ------------------------------------------------------------------
ident_width = max(len("IDENTIFIER"), max(len(r["identifier"]) for r in rows))
status_width = max(len("STATUS"), max(len(r["status"]) for r in rows))
cat_width = max(
len("CATEGORIES"),
max((len(",".join(r["categories"])) for r in rows), default=0),
)
tag_width = max(
len("TAGS"),
max((len(",".join(r["tags"])) for r in rows), default=0),
)
header = (
f"{GREY}{BOLD}"
f"{'IDENTIFIER'.ljust(ident_width)} "
f"{'STATUS'.ljust(status_width)} "
f"{'CATEGORIES'.ljust(cat_width)} "
f"{'TAGS'.ljust(tag_width)} "
f"DIR"
f"{RESET}"
)
print(header)
print("-" * (ident_width + status_width + cat_width + tag_width + 10 + 40))
for r in rows:
ident_col = r["identifier"].ljust(ident_width)
cat_col = ",".join(r["categories"]).ljust(cat_width)
tag_col = ",".join(r["tags"]).ljust(tag_width)
dir_col = r["dir"]
status = r["status"]
status_padded = status.ljust(status_width)
status_colored = _color_status(status_padded)
print(
f"{ident_col} "
f"{status_colored} "
f"{cat_col} "
f"{tag_col} "
f"{dir_col}"
)
# ------------------------------------------------------------------
# Detailed section (alias value red, same status coloring)
# ------------------------------------------------------------------
if not show_description:
return
print()
print(f"{BOLD}Detailed repository information:{RESET}")
print()
for r in rows:
repo = r["repo"]
identifier = r["identifier"]
alias = str(repo.get("alias") or "")
provider = str(repo.get("provider") or "")
account = str(repo.get("account") or "")
repository = str(repo.get("repository") or "")
description = str(repo.get("description") or "")
homepage = str(repo.get("homepage") or "")
categories = r["categories"]
tags = r["tags"]
repo_dir = r["dir"]
status = r["status"]
print(f"{BOLD}{identifier}{RESET}")
print(f" Provider: {provider}")
print(f" Account: {account}")
print(f" Repository: {repository}")
# Alias value highlighted in red
if alias:
print(f" Alias: {RED}{alias}{RESET}")
status_colored = _color_status(status)
print(f" Status: {status_colored}")
if categories:
print(f" Categories: {', '.join(categories)}")
if tags:
print(f" Tags: {', '.join(tags)}")
print(f" Directory: {repo_dir}")
if homepage:
print(f" Homepage: {homepage}")
if description:
print(f"\n\033[3m{description}\033[0m")
print("\nAttributes:")
# Loop through all attributes.
for key, value in repo.items():
formatted_value = str(value)
# Special formatting for the "verified" attribute (yellow).
if key == "verified" and value:
formatted_value = f"\033[1;33m{value}\033[0m"
# Special formatting for the "ignore" flag (red if True).
if key == "ignore" and value:
formatted_value = f"\033[1;31m{value}\033[0m"
if key == "description":
continue
# Highlight homepage in blue.
if key.lower() == "homepage" and value:
formatted_value = f"\033[1;34m{value}\033[0m"
print(f" {key}: {formatted_value}")
# Always display the computed status.
print(f" Status: {status_str}")
# If the repository is installed, display a hint for more info.
if os.path.exists(executable_path):
print(f"\nMore information and help: \033[1;4mpkgmgr {identifier} --help\033[0m\n")
print("-" * 40)
print(" Description:")
for line in wrap(description, width=78):
print(f" {line}")
print()

View File

@@ -1,30 +1,305 @@
import sys
import yaml
import os
from .get_repo_dir import get_repo_dir
DEFAULT_CONFIG_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../","config", "defaults.yaml")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
def load_config(user_config_path):
"""Load configuration from defaults and merge in user config if present."""
if not os.path.exists(DEFAULT_CONFIG_PATH):
print(f"Default configuration file '{DEFAULT_CONFIG_PATH}' not found.")
sys.exit(5)
with open(DEFAULT_CONFIG_PATH, 'r') as f:
config = yaml.safe_load(f)
if "directories" not in config or "repositories" not in config:
print("Default config file must contain 'directories' and 'repositories' keys.")
sys.exit(6)
if os.path.exists(user_config_path):
with open(user_config_path, 'r') as f:
user_config = yaml.safe_load(f)
if user_config:
if "directories" in user_config:
config["directories"] = user_config["directories"]
if "repositories" in user_config:
config["repositories"].extend(user_config["repositories"])
for repository in config["repositories"]:
# You can overwritte the directory path in the config
if "directory" not in repository:
directory = get_repo_dir(config["directories"]["repositories"], repository)
repository["directory"] = os.path.expanduser(directory)
return config
"""
Load and merge pkgmgr configuration.
Layering rules:
1. Defaults / category files:
- Zuerst werden alle *.yml/*.yaml (außer config.yaml) im
Benutzerverzeichnis geladen:
~/.config/pkgmgr/
- Falls dort keine passenden Dateien existieren, wird auf die im
Paket / Projekt mitgelieferten Config-Verzeichnisse zurückgegriffen:
<pkg_root>/config_defaults
<pkg_root>/config
<project_root>/config_defaults
<project_root>/config
Dabei werden ebenfalls alle *.yml/*.yaml als Layer geladen.
- Der Dateiname ohne Endung (stem) wird als Kategorie-Name
verwendet und in repo["category_files"] eingetragen.
2. User config:
- ~/.config/pkgmgr/config.yaml (oder der übergebene Pfad)
wird geladen und PER LISTEN-MERGE über die Defaults gelegt:
- directories: dict deep-merge
- repositories: per _merge_repo_lists (kein Löschen!)
3. Ergebnis:
- Ein dict mit mindestens:
config["directories"] (dict)
config["repositories"] (list[dict])
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Dict, List, Tuple
import yaml
Repo = Dict[str, Any]
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]:
"""
Recursively merge two dictionaries.
Values from `override` win over values in `base`.
"""
for key, value in override.items():
if (
key in base
and isinstance(base[key], dict)
and isinstance(value, dict)
):
_deep_merge(base[key], value)
else:
base[key] = value
return base
def _repo_key(repo: Repo) -> Tuple[str, str, str]:
"""
Normalised key for identifying a repository across config files.
"""
return (
str(repo.get("provider", "")),
str(repo.get("account", "")),
str(repo.get("repository", "")),
)
def _merge_repo_lists(
base_list: List[Repo],
new_list: List[Repo],
category_name: str | None = None,
) -> List[Repo]:
"""
Merge two repository lists, matching by (provider, account, repository).
- Wenn ein Repo aus new_list noch nicht existiert, wird es hinzugefügt.
- Wenn es existiert, werden seine Felder per Deep-Merge überschrieben.
- Wenn category_name gesetzt ist, wird dieser in
repo["category_files"] eingetragen.
"""
index: Dict[Tuple[str, str, str], Repo] = {
_repo_key(r): r for r in base_list
}
for src in new_list:
key = _repo_key(src)
if key == ("", "", ""):
# Unvollständiger Schlüssel -> einfach anhängen
dst = dict(src)
if category_name:
dst.setdefault("category_files", [])
if category_name not in dst["category_files"]:
dst["category_files"].append(category_name)
base_list.append(dst)
continue
existing = index.get(key)
if existing is None:
dst = dict(src)
if category_name:
dst.setdefault("category_files", [])
if category_name not in dst["category_files"]:
dst["category_files"].append(category_name)
base_list.append(dst)
index[key] = dst
else:
_deep_merge(existing, src)
if category_name:
existing.setdefault("category_files", [])
if category_name not in existing["category_files"]:
existing["category_files"].append(category_name)
return base_list
def _load_yaml_file(path: Path) -> Dict[str, Any]:
"""
Load a single YAML file as dict. Non-dicts yield {}.
"""
if not path.is_file():
return {}
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
return {}
return data
def _load_layer_dir(
config_dir: Path,
skip_filename: str | None = None,
) -> Dict[str, Any]:
"""
Load all *.yml/*.yaml from a directory as layered defaults.
- skip_filename: Dateiname (z.B. "config.yaml"), der ignoriert
werden soll (z.B. User-Config).
Rückgabe:
{
"directories": {...},
"repositories": [...],
}
"""
defaults: Dict[str, Any] = {"directories": {}, "repositories": []}
if not config_dir.is_dir():
return defaults
yaml_files = [
p
for p in config_dir.iterdir()
if p.is_file()
and p.suffix.lower() in (".yml", ".yaml")
and (skip_filename is None or p.name != skip_filename)
]
if not yaml_files:
return defaults
yaml_files.sort(key=lambda p: p.name)
for path in yaml_files:
data = _load_yaml_file(path)
category_name = path.stem # Dateiname ohne .yml/.yaml
dirs = data.get("directories")
if isinstance(dirs, dict):
defaults.setdefault("directories", {})
_deep_merge(defaults["directories"], dirs)
repos = data.get("repositories")
if isinstance(repos, list):
defaults.setdefault("repositories", [])
_merge_repo_lists(
defaults["repositories"],
repos,
category_name=category_name,
)
return defaults
def _load_defaults_from_package_or_project() -> Dict[str, Any]:
"""
Fallback: Versuche Defaults aus dem installierten Paket ODER
aus dem Projekt-Root zu laden:
<pkg_root>/config_defaults
<pkg_root>/config
<project_root>/config_defaults
<project_root>/config
"""
try:
import pkgmgr # type: ignore
except Exception:
return {"directories": {}, "repositories": []}
pkg_root = Path(pkgmgr.__file__).resolve().parent
project_root = pkg_root.parent
candidates = [
pkg_root / "config_defaults",
pkg_root / "config",
project_root / "config_defaults",
project_root / "config",
]
for cand in candidates:
defaults = _load_layer_dir(cand, skip_filename=None)
if defaults["directories"] or defaults["repositories"]:
return defaults
return {"directories": {}, "repositories": []}
# ---------------------------------------------------------------------------
# Hauptfunktion
# ---------------------------------------------------------------------------
def load_config(user_config_path: str) -> Dict[str, Any]:
"""
Load and merge configuration for pkgmgr.
Schritte:
1. Ermittle ~/.config/pkgmgr/ (oder das Verzeichnis von user_config_path).
2. Lade alle *.yml/*.yaml dort (außer der User-Config selbst) als
Defaults / Kategorie-Layer.
3. Wenn dort nichts gefunden wurde, Fallback auf Paket/Projekt.
4. Lade die User-Config-Datei selbst (falls vorhanden).
5. Merge:
- directories: deep-merge (Defaults <- User)
- repositories: _merge_repo_lists (Defaults <- User)
"""
user_config_path_expanded = os.path.expanduser(user_config_path)
user_cfg_path = Path(user_config_path_expanded)
config_dir = user_cfg_path.parent
if not str(config_dir):
# Fallback, falls jemand nur "config.yaml" übergibt
config_dir = Path(os.path.expanduser("~/.config/pkgmgr"))
config_dir.mkdir(parents=True, exist_ok=True)
user_cfg_name = user_cfg_path.name
# 1+2) Defaults / Kategorie-Layer aus dem User-Verzeichnis
defaults = _load_layer_dir(config_dir, skip_filename=user_cfg_name)
# 3) Falls dort nichts gefunden wurde, Fallback auf Paket/Projekt
if not defaults["directories"] and not defaults["repositories"]:
defaults = _load_defaults_from_package_or_project()
defaults.setdefault("directories", {})
defaults.setdefault("repositories", [])
# 4) User-Config
user_cfg: Dict[str, Any] = {}
if user_cfg_path.is_file():
user_cfg = _load_yaml_file(user_cfg_path)
user_cfg.setdefault("directories", {})
user_cfg.setdefault("repositories", [])
# 5) Merge: directories deep-merge, repositories listen-merge
merged: Dict[str, Any] = {}
# directories
merged["directories"] = {}
_deep_merge(merged["directories"], defaults["directories"])
_deep_merge(merged["directories"], user_cfg["directories"])
# repositories
merged["repositories"] = []
_merge_repo_lists(merged["repositories"], defaults["repositories"], category_name=None)
_merge_repo_lists(merged["repositories"], user_cfg["repositories"], category_name=None)
# andere Top-Level-Keys (falls vorhanden)
other_keys = (set(defaults.keys()) | set(user_cfg.keys())) - {
"directories",
"repositories",
}
for key in other_keys:
base_val = defaults.get(key)
override_val = user_cfg.get(key)
if isinstance(base_val, dict) and isinstance(override_val, dict):
merged[key] = _deep_merge(dict(base_val), override_val)
elif override_val is not None:
merged[key] = override_val
else:
merged[key] = base_val
return merged

View File

@@ -23,15 +23,14 @@ Additional behaviour:
phases:
1) Preview-only run (dry-run).
2) Interactive confirmation, then real release if confirmed.
This confirmation can be skipped with the `-f/--force` flag.
- If `-c/--close` is used and the current branch is not main/master,
This confirmation can be skipped with the `force=True` flag.
- If `close=True` is used and the current branch is not main/master,
the branch will be closed via branch_commands.close_branch() after
a successful release.
"""
from __future__ import annotations
import argparse
import os
import re
import subprocess
@@ -142,7 +141,6 @@ def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
encoding="utf-8",
) as tmp:
tmp_path = tmp.name
# Prefill with instructions as comments
tmp.write(
"# Write the changelog entry for this release.\n"
"# Lines starting with '#' will be ignored.\n"
@@ -152,7 +150,6 @@ def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
tmp.write(initial_message.strip() + "\n")
tmp.flush()
# Open editor (best-effort; fall back gracefully if not available)
try:
subprocess.call([editor, tmp_path])
except FileNotFoundError:
@@ -161,7 +158,6 @@ def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
"interactive changelog message."
)
# Read back content
try:
with open(tmp_path, "r", encoding="utf-8") as f:
content = f.read()
@@ -171,7 +167,6 @@ def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
except OSError:
pass
# Filter out commented lines and return joined text
lines = [
line for line in content.splitlines()
if not line.strip().startswith("#")
@@ -197,14 +192,6 @@ def update_pyproject_version(
version = "X.Y.Z"
and replaces the version part with the given new_version string.
It does not try to parse the full TOML structure here. This keeps the
implementation small and robust as long as the version line follows
the standard pattern.
Behaviour:
- In normal mode: write the updated content back to the file.
- In preview mode: do NOT write, only report what would change.
"""
try:
with open(pyproject_path, "r", encoding="utf-8") as f:
@@ -242,13 +229,6 @@ def update_flake_version(
) -> None:
"""
Update the version in flake.nix, if present.
Looks for a line like:
version = "1.2.3";
and replaces the string inside the quotes. If the file does not
exist or no version line is found, this is treated as a non-fatal
condition and only a log message is printed.
"""
if not os.path.exists(flake_path):
print("[INFO] flake.nix not found, skipping.")
@@ -293,13 +273,6 @@ def update_pkgbuild_version(
Expects:
pkgver=1.2.3
pkgrel=1
Behaviour:
- Set pkgver to the new_version (e.g. 1.2.3).
- Reset pkgrel to 1.
If the file does not exist, this is non-fatal and only a log
message is printed.
"""
if not os.path.exists(pkgbuild_path):
print("[INFO] PKGBUILD not found, skipping.")
@@ -312,7 +285,6 @@ def update_pkgbuild_version(
print(f"[WARN] Could not read PKGBUILD: {exc}")
return
# Update pkgver
ver_pattern = r"^(pkgver\s*=\s*)(.+)$"
new_content, ver_count = re.subn(
ver_pattern,
@@ -323,9 +295,8 @@ def update_pkgbuild_version(
if ver_count == 0:
print("[WARN] No pkgver line found in PKGBUILD.")
new_content = content # revert to original if we didn't change anything
new_content = content
# Reset pkgrel to 1
rel_pattern = r"^(pkgrel\s*=\s*)(.+)$"
new_content, rel_count = re.subn(
rel_pattern,
@@ -354,19 +325,6 @@ def update_spec_version(
) -> None:
"""
Update the version in an RPM spec file, if present.
Assumes a file like 'package-manager.spec' with lines:
Version: 1.2.3
Release: 1%{?dist}
Behaviour:
- Set 'Version:' to new_version.
- Reset 'Release:' to '1' while preserving any macro suffix,
e.g. '1%{?dist}'.
If the file does not exist, this is non-fatal and only a log
message is printed.
"""
if not os.path.exists(spec_path):
print("[INFO] RPM spec file not found, skipping.")
@@ -379,7 +337,6 @@ def update_spec_version(
print(f"[WARN] Could not read spec file: {exc}")
return
# Update Version:
ver_pattern = r"^(Version:\s*)(.+)$"
new_content, ver_count = re.subn(
ver_pattern,
@@ -391,12 +348,10 @@ def update_spec_version(
if ver_count == 0:
print("[WARN] No 'Version:' line found in spec file.")
# Reset Release:
rel_pattern = r"^(Release:\s*)(.+)$"
def _release_repl(m: re.Match[str]) -> str: # type: ignore[name-defined]
rest = m.group(2).strip()
# Reset numeric prefix to "1" and keep any suffix (e.g. % macros).
match = re.match(r"^(\d+)(.*)$", rest)
if match:
suffix = match.group(2)
@@ -439,21 +394,11 @@ def update_changelog(
"""
Prepend a new release section to CHANGELOG.md with the new version,
current date, and a message.
Behaviour:
- If message is None and preview is False:
→ open $EDITOR (fallback 'nano') to let the user enter a message.
- If message is None and preview is True:
→ use a generic automated message.
- The resulting changelog entry is printed to stdout.
- Returns the final message text used.
"""
today = date.today().isoformat()
# Resolve message
if message is None:
if preview:
# Do not open editor in preview mode; keep it non-interactive.
message = "Automated release."
else:
print(
@@ -481,7 +426,6 @@ def update_changelog(
new_changelog = header + "\n" + changelog if changelog else header
# Show the entry that will be written
print("\n================ CHANGELOG ENTRY ================")
print(header.rstrip())
print("=================================================\n")
@@ -506,8 +450,6 @@ def update_changelog(
def _get_git_config_value(key: str) -> Optional[str]:
"""
Try to read a value from `git config --get <key>`.
Returns the stripped value or None if not set / on error.
"""
try:
result = subprocess.run(
@@ -526,12 +468,6 @@ def _get_git_config_value(key: str) -> Optional[str]:
def _get_debian_author() -> Tuple[str, str]:
"""
Determine the maintainer name/email for debian/changelog entries.
Priority:
1. DEBFULLNAME / DEBEMAIL
2. GIT_AUTHOR_NAME / GIT_AUTHOR_EMAIL
3. git config user.name / user.email
4. Fallback: 'Unknown Maintainer' / 'unknown@example.com'
"""
name = os.environ.get("DEBFULLNAME")
email = os.environ.get("DEBEMAIL")
@@ -563,12 +499,6 @@ def update_debian_changelog(
) -> None:
"""
Prepend a new entry to debian/changelog, if it exists.
The first line typically looks like:
package-name (1.2.3-1) unstable; urgency=medium
We generate a new stanza at the top with Debian-style version
'X.Y.Z-1'. If the file does not exist, this function does nothing.
"""
if not os.path.exists(debian_changelog_path):
print("[INFO] debian/changelog not found, skipping.")
@@ -576,15 +506,12 @@ def update_debian_changelog(
debian_version = f"{new_version}-1"
now = datetime.now().astimezone()
# Debian-like date string, e.g. "Mon, 08 Dec 2025 12:34:56 +0100"
date_str = now.strftime("%a, %d %b %Y %H:%M:%S %z")
author_name, author_email = _get_debian_author()
first_line = f"{package_name} ({debian_version}) unstable; urgency=medium"
body_line = (
message.strip() if message else f"Automated release {new_version}."
)
body_line = message.strip() if message else f"Automated release {new_version}."
stanza = (
f"{first_line}\n\n"
f" * {body_line}\n\n"
@@ -628,22 +555,8 @@ def _release_impl(
) -> None:
"""
Internal implementation that performs a single-phase release.
If `preview` is True:
- No files are written.
- No git commands are executed.
- Planned actions are printed.
If `preview` is False:
- Files are updated.
- Git commit, tag, and push are executed.
- If `close` is True and the current branch is not main/master,
the branch will be closed after a successful release.
"""
# 1) Determine the current version from Git tags.
current_ver = _determine_current_version()
# 2) Compute the next version.
new_ver = _bump_semver(current_ver, release_type)
new_ver_str = str(new_ver)
new_tag = new_ver.to_tag(with_prefix=True)
@@ -653,12 +566,9 @@ def _release_impl(
print(f"Current version: {current_ver}")
print(f"New version: {new_ver_str} ({release_type})")
# Determine repository root based on pyproject location
repo_root = os.path.dirname(os.path.abspath(pyproject_path))
# 2) Update files.
update_pyproject_version(pyproject_path, new_ver_str, preview=preview)
# Let update_changelog resolve or edit the message; capture it separately.
changelog_message = update_changelog(
changelog_path,
new_ver_str,
@@ -666,7 +576,6 @@ def _release_impl(
preview=preview,
)
# Additional packaging files (non-fatal if missing)
flake_path = os.path.join(repo_root, "flake.nix")
update_flake_version(flake_path, new_ver_str, preview=preview)
@@ -676,14 +585,12 @@ def _release_impl(
spec_path = os.path.join(repo_root, "package-manager.spec")
update_spec_version(spec_path, new_ver_str, preview=preview)
# Determine an effective message for tag & Debian changelog.
effective_message: Optional[str] = message
if effective_message is None and isinstance(changelog_message, str):
if changelog_message.strip():
effective_message = changelog_message.strip()
debian_changelog_path = os.path.join(repo_root, "debian", "changelog")
# Use repo directory name as a simple default for package name
package_name = os.path.basename(repo_root) or "package-manager"
update_debian_changelog(
debian_changelog_path,
@@ -693,7 +600,6 @@ def _release_impl(
preview=preview,
)
# 3) Git operations: stage, commit, tag, push.
commit_msg = f"Release version {new_ver_str}"
tag_msg = effective_message or commit_msg
@@ -703,7 +609,6 @@ def _release_impl(
branch = "main"
print(f"Releasing on branch: {branch}")
# Stage all relevant packaging files so they are included in the commit
files_to_add = [
pyproject_path,
changelog_path,
@@ -725,11 +630,11 @@ def _release_impl(
if close and branch not in ("main", "master"):
print(
f"[PREVIEW] Would also close branch {branch} after the release "
"(--close specified and branch is not main/master)."
"(close=True and branch is not main/master)."
)
elif close:
print(
f"[PREVIEW] --close specified but current branch is {branch}; "
f"[PREVIEW] close=True but current branch is {branch}; "
"no branch would be closed."
)
@@ -746,27 +651,26 @@ def _release_impl(
print(f"Release {new_ver_str} completed.")
# Optional: close the current branch after a successful release.
if close:
if branch in ("main", "master"):
print(
f"[INFO] --close specified but current branch is {branch}; "
f"[INFO] close=True but current branch is {branch}; "
"nothing to close."
)
return
print(
f"[INFO] Closing branch {branch} after successful release "
"(--close enabled and branch is not main/master)..."
"(close=True and branch is not main/master)..."
)
try:
close_branch(name=branch, base_branch="main", cwd=".")
except Exception as exc: # pragma: no cover - defensive
except Exception as exc: # pragma: no cover
print(f"[WARN] Failed to close branch {branch} automatically: {exc}")
# ---------------------------------------------------------------------------
# Public release entry point (with preview-first + confirmation logic)
# Public release entry point
# ---------------------------------------------------------------------------
@@ -786,29 +690,13 @@ def release(
- preview=True:
* Single-phase PREVIEW only.
* No files are changed, no git commands are executed.
* `force` is ignored in this mode.
- preview=False, force=True:
* Single-phase REAL release, no interactive preview.
* Files are changed and git commands are executed immediately.
- preview=False, force=False:
* Two-phase flow (intended default for interactive CLI use):
1) PREVIEW: dry-run, printing all planned actions.
2) Ask the user for confirmation:
"Proceed with the actual release? [y/N]: "
If confirmed, perform the REAL release.
Otherwise, abort without changes.
* In non-interactive environments (stdin not a TTY), the
confirmation step is skipped automatically and a single
REAL phase is executed, to avoid blocking on input().
The `close` flag controls whether the current branch should be
closed after a successful REAL release (only if it is not main/master).
* Two-phase flow (intended default for interactive CLI use).
"""
# Explicit preview mode: just do a single PREVIEW phase and exit.
if preview:
_release_impl(
pyproject_path=pyproject_path,
@@ -820,7 +708,6 @@ def release(
)
return
# Non-preview, but forced: run REAL release directly.
if force:
_release_impl(
pyproject_path=pyproject_path,
@@ -832,7 +719,6 @@ def release(
)
return
# Non-interactive environment? Skip confirmation to avoid blocking.
if not sys.stdin.isatty():
_release_impl(
pyproject_path=pyproject_path,
@@ -844,7 +730,6 @@ def release(
)
return
# Interactive two-phase flow:
print("[INFO] Running preview before actual release...\n")
_release_impl(
pyproject_path=pyproject_path,
@@ -855,7 +740,6 @@ def release(
close=close,
)
# Ask for confirmation
try:
answer = input("Proceed with the actual release? [y/N]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
@@ -875,78 +759,3 @@ def release(
preview=False,
close=close,
)
# ---------------------------------------------------------------------------
# CLI entry point for standalone use
# ---------------------------------------------------------------------------
def _parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="pkgmgr release helper")
parser.add_argument(
"release_type",
choices=["major", "minor", "patch"],
help="Type of release (major/minor/patch).",
)
parser.add_argument(
"-m",
"--message",
dest="message",
default=None,
help="Release message to use for changelog and tag.",
)
parser.add_argument(
"--pyproject",
dest="pyproject",
default="pyproject.toml",
help="Path to pyproject.toml (default: pyproject.toml)",
)
parser.add_argument(
"--changelog",
dest="changelog",
default="CHANGELOG.md",
help="Path to CHANGELOG.md (default: CHANGELOG.md)",
)
parser.add_argument(
"--preview",
action="store_true",
help=(
"Preview release changes without modifying files or running git. "
"This mode never executes the real release."
),
)
parser.add_argument(
"-f",
"--force",
dest="force",
action="store_true",
help=(
"Skip the interactive preview+confirmation step and run the "
"release directly."
),
)
parser.add_argument(
"-c",
"--close",
dest="close",
action="store_true",
help=(
"Close the current branch after a successful release, "
"if it is not main/master."
),
)
return parser.parse_args(argv)
if __name__ == "__main__":
args = _parse_args()
release(
pyproject_path=args.pyproject,
changelog_path=args.changelog,
release_type=args.release_type,
message=args.message,
preview=args.preview,
force=args.force,
close=args.close,
)

View File

@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "package-manager"
version = "0.4.1"
version = "0.5.1"
description = "Kevin's package-manager tool (pkgmgr)"
readme = "README.md"
requires-python = ">=3.11"

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
import os
import runpy
import sys
import unittest
from test_integration_version_commands import PROJECT_ROOT
class TestIntegrationListCommands(unittest.TestCase):
"""
Integration tests for `pkgmgr list` with the new selection and
description behaviour.
"""
def _run_pkgmgr(self, args: list[str], cwd: str | None = None) -> None:
cmd_repr = "pkgmgr " + " ".join(args)
original_argv = list(sys.argv)
original_cwd = os.getcwd()
try:
if cwd is not None:
os.chdir(cwd)
# Simulate: pkgmgr <args...>
sys.argv = ["pkgmgr"] + args
try:
runpy.run_module("main", run_name="__main__")
except SystemExit as exc:
code = exc.code if isinstance(exc.code, int) else str(exc.code)
if code != 0:
print()
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Working directory: {os.getcwd()}")
print(f"[TEST] Exit code : {code}")
raise AssertionError(
f"{cmd_repr!r} failed with exit code {code}. "
"Scroll up to inspect the output printed before failure."
) from exc
finally:
os.chdir(original_cwd)
sys.argv = original_argv
def test_list_all_repositories(self) -> None:
"""
`pkgmgr list --all` should successfully print the summary table.
"""
self._run_pkgmgr(["list", "--all"], cwd=PROJECT_ROOT)
def test_list_all_with_description(self) -> None:
"""
`pkgmgr list --all --description` should print the table plus the
detailed section for each repository.
"""
self._run_pkgmgr(["list", "--all", "--description"], cwd=PROJECT_ROOT)
def test_list_with_string_filter(self) -> None:
"""
`pkgmgr list --string pkgmgr` exercises the new string-based
selection logic on top of the defaults + user config.
"""
self._run_pkgmgr(["list", "--string", "pkgmgr"], cwd=PROJECT_ROOT)

View File

@@ -0,0 +1,65 @@
from __future__ import annotations
import os
import runpy
import sys
import unittest
from test_integration_version_commands import PROJECT_ROOT
class TestIntegrationProxyCommands(unittest.TestCase):
"""
Integration tests for proxy commands (e.g. git pull) using the new
selection logic and `--preview` mode so no real changes are made.
"""
def _run_pkgmgr(self, args: list[str], cwd: str | None = None) -> None:
cmd_repr = "pkgmgr " + " ".join(args)
original_argv = list(sys.argv)
original_cwd = os.getcwd()
try:
if cwd is not None:
os.chdir(cwd)
# Simulate: pkgmgr <args...>
sys.argv = ["pkgmgr"] + args
try:
runpy.run_module("main", run_name="__main__")
except SystemExit as exc:
code = exc.code if isinstance(exc.code, int) else str(exc.code)
if code != 0:
print()
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Working directory: {os.getcwd()}")
print(f"[TEST] Exit code : {code}")
raise AssertionError(
f"{cmd_repr!r} failed with exit code {code}. "
"Scroll up to inspect the output printed before failure."
) from exc
finally:
os.chdir(original_cwd)
sys.argv = original_argv
def test_git_pull_preview_for_pkgmgr(self) -> None:
"""
`pkgmgr pull --preview pkgmgr` should go through the proxy layer,
use get_selected_repos() and only print the underlying git pull
command without executing it.
"""
self._run_pkgmgr(
["pull", "--preview", "pkgmgr"],
cwd=PROJECT_ROOT,
)
def test_git_pull_preview_with_string_filter(self) -> None:
"""
`pkgmgr pull --preview --string pkgmgr` exercises the proxy +
filter-only selection path.
"""
self._run_pkgmgr(
["pull", "--preview", "--string", "pkgmgr"],
cwd=PROJECT_ROOT,
)

View File

@@ -4,10 +4,17 @@
"""
End-to-end style integration tests for the `pkgmgr release` CLI command.
These tests exercise the top-level `pkgmgr` entry point by invoking
the module as `__main__` and verifying that the underlying
`pkgmgr.release.release()` function is called with the expected
arguments, in particular the new `close` flag.
These tests exercise the real top-level entry point (main.py) and mock
the high-level helper used by the CLI wiring
(pkgmgr.cli_core.commands.release.run_release) to ensure that argument
parsing and dispatch behave as expected, in particular the new `close`
flag.
The tests simulate real CLI calls like:
pkgmgr release minor --preview --close
by manipulating sys.argv and executing main.py as __main__ via runpy.
"""
from __future__ import annotations
@@ -21,55 +28,110 @@ from unittest.mock import patch
class TestIntegrationReleaseCommand(unittest.TestCase):
"""Integration tests for `pkgmgr release` wiring."""
def _run_pkgmgr(self, argv: list[str]) -> None:
def _run_pkgmgr(self, extra_args: list[str]) -> None:
"""
Helper to invoke the `pkgmgr` console script via `run_module`.
Helper to invoke the `pkgmgr` console script via the real
entry point (main.py).
This simulates a real CLI call like:
pkgmgr release minor --preview --close
pkgmgr <extra_args...>
by setting sys.argv accordingly and executing main.py as
__main__ using runpy.run_module.
"""
original_argv = list(sys.argv)
try:
sys.argv = argv
# Entry point: the `pkgmgr` module is the console script.
runpy.run_module("pkgmgr", run_name="__main__")
# argv[0] is the program name; the rest are CLI arguments.
sys.argv = ["pkgmgr"] + list(extra_args)
runpy.run_module("main", run_name="__main__")
finally:
sys.argv = original_argv
@patch("pkgmgr.release.release")
def test_release_without_close_flag(self, mock_release) -> None:
# ------------------------------------------------------------------
# Behaviour without --close
# ------------------------------------------------------------------
@patch("pkgmgr.cli_core.commands.release.run_release")
@patch("pkgmgr.cli_core.dispatch._select_repo_for_current_directory")
def test_release_without_close_flag(
self,
mock_select_repo,
mock_run_release,
) -> None:
"""
Calling `pkgmgr release patch --preview` should *not* enable
the `close` flag by default.
"""
self._run_pkgmgr(["pkgmgr", "release", "patch", "--preview"])
# Ensure that the dispatch layer always selects a repository,
# independent of any real config in the test environment.
mock_select_repo.return_value = [
{
"directory": ".",
"provider": "local",
"account": "test",
"repository": "dummy",
}
]
mock_release.assert_called_once()
_args, kwargs = mock_release.call_args
self._run_pkgmgr(["release", "patch", "--preview"])
mock_run_release.assert_called_once()
_args, kwargs = mock_run_release.call_args
# CLI wiring
self.assertEqual(kwargs.get("release_type"), "patch")
self.assertTrue(kwargs.get("preview"), "preview should be True when --preview is used")
self.assertTrue(
kwargs.get("preview"),
"preview should be True when --preview is used",
)
# Default: no --close → close=False
self.assertFalse(kwargs.get("close"), "close must be False when --close is not given")
self.assertFalse(
kwargs.get("close"),
"close must be False when --close is not given",
)
@patch("pkgmgr.release.release")
def test_release_with_close_flag(self, mock_release) -> None:
# ------------------------------------------------------------------
# Behaviour with --close
# ------------------------------------------------------------------
@patch("pkgmgr.cli_core.commands.release.run_release")
@patch("pkgmgr.cli_core.dispatch._select_repo_for_current_directory")
def test_release_with_close_flag(
self,
mock_select_repo,
mock_run_release,
) -> None:
"""
Calling `pkgmgr release minor --preview --close` should pass
close=True into pkgmgr.release.release().
close=True into the helper used by the CLI wiring.
"""
self._run_pkgmgr(["pkgmgr", "release", "minor", "--preview", "--close"])
# Again: make sure there is always a selected repository.
mock_select_repo.return_value = [
{
"directory": ".",
"provider": "local",
"account": "test",
"repository": "dummy",
}
]
mock_release.assert_called_once()
_args, kwargs = mock_release.call_args
self._run_pkgmgr(["release", "minor", "--preview", "--close"])
mock_run_release.assert_called_once()
_args, kwargs = mock_run_release.call_args
# CLI wiring
self.assertEqual(kwargs.get("release_type"), "minor")
self.assertTrue(kwargs.get("preview"), "preview should be True when --preview is used")
self.assertTrue(
kwargs.get("preview"),
"preview should be True when --preview is used",
)
# With --close → close=True
self.assertTrue(kwargs.get("close"), "close must be True when --close is given")
self.assertTrue(
kwargs.get("close"),
"close must be True when --close is given",
)
if __name__ == "__main__":

View File

View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Unit tests for pkgmgr.cli_core.commands.release.
These tests focus on the wiring layer:
- Argument handling for the release command as defined by the
top-level parser (cli_core.parser.create_parser).
- Correct invocation of pkgmgr.release.release(...) for the
selected repositories.
- Behaviour of --preview, --list, --close, and -f/--force.
"""
from __future__ import annotations
from types import SimpleNamespace
from typing import List
from unittest.mock import patch, call
import argparse
import unittest
class TestReleaseCommand(unittest.TestCase):
"""
Tests for the `pkgmgr release` CLI wiring.
"""
def _make_ctx(self, all_repos: List[dict]) -> SimpleNamespace:
"""
Create a minimal CLIContext-like object for tests.
Only the attributes that handle_release() uses are provided.
"""
return SimpleNamespace(
config_merged={},
repositories_base_dir="/base/dir",
all_repositories=all_repos,
binaries_dir="/bin",
user_config_path="/tmp/config.yaml",
)
def _parse_release_args(self, argv: List[str]) -> argparse.Namespace:
"""
Build a real top-level parser and parse the given argv list
to obtain the Namespace for the `release` command.
"""
from pkgmgr.cli_core.parser import create_parser
parser = create_parser("test parser")
args = parser.parse_args(argv)
self.assertEqual(args.command, "release")
return args
@patch("pkgmgr.cli_core.commands.release.os.path.isdir", return_value=True)
@patch("pkgmgr.cli_core.commands.release.run_release")
@patch("pkgmgr.cli_core.commands.release.get_repo_dir")
@patch("pkgmgr.cli_core.commands.release.get_repo_identifier")
@patch("pkgmgr.cli_core.commands.release.os.chdir")
@patch("pkgmgr.cli_core.commands.release.os.getcwd", return_value="/cwd")
def test_release_with_close_and_message(
self,
mock_getcwd,
mock_chdir,
mock_get_repo_identifier,
mock_get_repo_dir,
mock_run_release,
mock_isdir,
) -> None:
"""
The release handler should call pkgmgr.release.release() with:
- release_type (e.g. minor)
- provided message
- preview flag
- force flag
- close flag
It must change into the repository directory and then back.
"""
from pkgmgr.cli_core.commands.release import handle_release
repo = {"name": "dummy-repo"}
selected = [repo]
ctx = self._make_ctx(selected)
mock_get_repo_identifier.return_value = "dummy-id"
mock_get_repo_dir.return_value = "/repos/dummy"
argv = [
"release",
"minor",
"dummy-id",
"-m",
"Close branch after minor release",
"--close",
"-f",
]
args = self._parse_release_args(argv)
handle_release(args, ctx, selected)
# We should have changed into the repo dir and then back.
mock_chdir.assert_has_calls(
[call("/repos/dummy"), call("/cwd")]
)
# And run_release should be invoked once with the expected parameters.
mock_run_release.assert_called_once_with(
pyproject_path="pyproject.toml",
changelog_path="CHANGELOG.md",
release_type="minor",
message="Close branch after minor release",
preview=False,
force=True,
close=True,
)
@patch("pkgmgr.cli_core.commands.release.os.path.isdir", return_value=True)
@patch("pkgmgr.cli_core.commands.release.run_release")
@patch("pkgmgr.cli_core.commands.release.get_repo_dir")
@patch("pkgmgr.cli_core.commands.release.get_repo_identifier")
@patch("pkgmgr.cli_core.commands.release.os.chdir")
@patch("pkgmgr.cli_core.commands.release.os.getcwd", return_value="/cwd")
def test_release_preview_mode(
self,
mock_getcwd,
mock_chdir,
mock_get_repo_identifier,
mock_get_repo_dir,
mock_run_release,
mock_isdir,
) -> None:
"""
In preview mode, the handler should pass preview=True to the
release helper and force=False by default.
"""
from pkgmgr.cli_core.commands.release import handle_release
repo = {"name": "dummy-repo"}
selected = [repo]
ctx = self._make_ctx(selected)
mock_get_repo_identifier.return_value = "dummy-id"
mock_get_repo_dir.return_value = "/repos/dummy"
argv = [
"release",
"patch",
"dummy-id",
"--preview",
]
args = self._parse_release_args(argv)
handle_release(args, ctx, selected)
mock_run_release.assert_called_once_with(
pyproject_path="pyproject.toml",
changelog_path="CHANGELOG.md",
release_type="patch",
message=None,
preview=True,
force=False,
close=False,
)
@patch("pkgmgr.cli_core.commands.release.run_release")
@patch("pkgmgr.cli_core.commands.release.get_repo_dir")
@patch("pkgmgr.cli_core.commands.release.get_repo_identifier")
def test_release_list_mode_does_not_invoke_helper(
self,
mock_get_repo_identifier,
mock_get_repo_dir,
mock_run_release,
) -> None:
"""
When --list is provided, the handler should print the list of affected
repositories and must NOT invoke run_release().
"""
from pkgmgr.cli_core.commands.release import handle_release
repo1 = {"name": "repo-1"}
repo2 = {"name": "repo-2"}
selected = [repo1, repo2]
ctx = self._make_ctx(selected)
mock_get_repo_identifier.side_effect = ["id-1", "id-2"]
argv = [
"release",
"major",
"--list",
]
args = self._parse_release_args(argv)
handle_release(args, ctx, selected)
mock_run_release.assert_not_called()
self.assertEqual(
mock_get_repo_identifier.call_args_list,
[call(repo1, selected), call(repo2, selected)],
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,112 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Unit tests for the `pkgmgr branch` CLI wiring.
These tests verify that:
- The argument parser creates the correct structure for
`branch open` and `branch close`.
- `handle_branch` calls the corresponding helper functions
with the expected arguments (including base branch and cwd).
"""
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.cli_core.parser import create_parser
from pkgmgr.cli_core.commands.branch import handle_branch
class TestBranchCLI(unittest.TestCase):
"""
Tests for the branch subcommands implemented in cli_core.
"""
def _create_parser(self):
"""
Create the top-level parser with a minimal description.
"""
return create_parser("pkgmgr test parser")
@patch("pkgmgr.cli_core.commands.branch.open_branch")
def test_branch_open_with_name_and_base(self, mock_open_branch):
"""
Ensure that `pkgmgr branch open <name> --base <branch>` calls
open_branch() with the correct parameters.
"""
parser = self._create_parser()
args = parser.parse_args(
["branch", "open", "feature/test-branch", "--base", "develop"]
)
# Sanity check: parser wiring
self.assertEqual(args.command, "branch")
self.assertEqual(args.subcommand, "open")
self.assertEqual(args.name, "feature/test-branch")
self.assertEqual(args.base, "develop")
# ctx is currently unused by handle_branch, so we can pass None
handle_branch(args, ctx=None)
mock_open_branch.assert_called_once()
_args, kwargs = mock_open_branch.call_args
self.assertEqual(kwargs.get("name"), "feature/test-branch")
self.assertEqual(kwargs.get("base_branch"), "develop")
self.assertEqual(kwargs.get("cwd"), ".")
@patch("pkgmgr.cli_core.commands.branch.close_branch")
def test_branch_close_with_name_and_base(self, mock_close_branch):
"""
Ensure that `pkgmgr branch close <name> --base <branch>` calls
close_branch() with the correct parameters.
"""
parser = self._create_parser()
args = parser.parse_args(
["branch", "close", "feature/old-branch", "--base", "main"]
)
# Sanity check: parser wiring
self.assertEqual(args.command, "branch")
self.assertEqual(args.subcommand, "close")
self.assertEqual(args.name, "feature/old-branch")
self.assertEqual(args.base, "main")
handle_branch(args, ctx=None)
mock_close_branch.assert_called_once()
_args, kwargs = mock_close_branch.call_args
self.assertEqual(kwargs.get("name"), "feature/old-branch")
self.assertEqual(kwargs.get("base_branch"), "main")
self.assertEqual(kwargs.get("cwd"), ".")
@patch("pkgmgr.cli_core.commands.branch.close_branch")
def test_branch_close_without_name_uses_none(self, mock_close_branch):
"""
Ensure that `pkgmgr branch close` without a name passes name=None
into close_branch(), leaving branch resolution to the helper.
"""
parser = self._create_parser()
args = parser.parse_args(["branch", "close"])
# Parser wiring: no name → None
self.assertEqual(args.command, "branch")
self.assertEqual(args.subcommand, "close")
self.assertIsNone(args.name)
handle_branch(args, ctx=None)
mock_close_branch.assert_called_once()
_args, kwargs = mock_close_branch.call_args
self.assertIsNone(kwargs.get("name"))
self.assertEqual(kwargs.get("base_branch"), "main")
self.assertEqual(kwargs.get("cwd"), ".")
if __name__ == "__main__":
unittest.main()

View File

@@ -42,7 +42,7 @@ def _fake_config() -> Dict[str, Any]:
"workspaces": "/tmp/pkgmgr-workspaces",
},
# The actual list of repositories is not used directly by the tests,
# because we mock get_selected_repos(). It must exist, though.
# because we mock the selection logic. It must exist, though.
"repositories": [],
}
@@ -54,8 +54,9 @@ class TestCliVersion(unittest.TestCase):
Each test:
- Runs in a temporary working directory.
- Uses a fake configuration via load_config().
- Uses a mocked get_selected_repos() that returns a single repo
pointing to the temporary directory.
- Uses the same selection logic as the new CLI:
* dispatch_command() calls _select_repo_for_current_directory()
when there is no explicit selection.
"""
def setUp(self) -> None:
@@ -64,35 +65,30 @@ class TestCliVersion(unittest.TestCase):
self._old_cwd = os.getcwd()
os.chdir(self._tmp_dir.name)
# Define a fake repo pointing to our temp dir
self._fake_repo = {
"provider": "github.com",
"account": "test",
"repository": "pkgmgr-test",
"directory": self._tmp_dir.name,
}
# Patch load_config so cli.main() does not read real config files
self._patch_load_config = mock.patch(
"pkgmgr.cli.load_config", return_value=_fake_config()
)
self.mock_load_config = self._patch_load_config.start()
# Patch get_selected_repos so that 'version' operates on our temp dir.
# In the new modular CLI this function is used inside
# pkgmgr.cli_core.dispatch, so we patch it there.
def _fake_selected_repos(
all_flag: bool,
repos: List[dict],
identifiers: List[str],
):
# We always return exactly one "repository" whose directory is the temp dir.
return [
{
"provider": "github.com",
"account": "test",
"repository": "pkgmgr-test",
"directory": self._tmp_dir.name,
}
]
self._patch_get_selected_repos = mock.patch(
"pkgmgr.cli_core.dispatch.get_selected_repos",
side_effect=_fake_selected_repos,
# Patch the "current directory" selection used by dispatch_command().
# This matches the new behaviour: without explicit identifiers,
# version uses _select_repo_for_current_directory(ctx).
self._patch_select_repo_for_current_directory = mock.patch(
"pkgmgr.cli_core.dispatch._select_repo_for_current_directory",
return_value=[self._fake_repo],
)
self.mock_select_repo_for_current_directory = (
self._patch_select_repo_for_current_directory.start()
)
self.mock_get_selected_repos = self._patch_get_selected_repos.start()
# Keep a reference to the original sys.argv, so we can restore it
self._old_argv = list(sys.argv)
@@ -102,7 +98,7 @@ class TestCliVersion(unittest.TestCase):
sys.argv = self._old_argv
# Stop all patches
self._patch_get_selected_repos.stop()
self._patch_select_repo_for_current_directory.stop()
self._patch_load_config.stop()
# Restore working directory
@@ -228,7 +224,7 @@ class TestCliVersion(unittest.TestCase):
# Arrange: pyproject.toml exists
self._write_pyproject("0.0.1")
# Arrange: no tags returned (again: patch handle_version's get_tags)
# Arrange: no tags returned
with mock.patch(
"pkgmgr.cli_core.commands.version.get_tags",
return_value=[],

View File

@@ -365,6 +365,7 @@ class TestUpdateDebianChangelog(unittest.TestCase):
class TestReleaseOrchestration(unittest.TestCase):
@patch("pkgmgr.release.sys.stdin.isatty", return_value=False)
@patch("pkgmgr.release._run_git_command")
@patch("pkgmgr.release.update_debian_changelog")
@patch("pkgmgr.release.update_spec_version")
@@ -387,6 +388,7 @@ class TestReleaseOrchestration(unittest.TestCase):
mock_update_spec,
mock_update_debian_changelog,
mock_run_git_command,
mock_isatty,
) -> None:
mock_determine_current_version.return_value = SemVer(1, 2, 3)
mock_bump_semver.return_value = SemVer(1, 2, 4)
@@ -449,6 +451,7 @@ class TestReleaseOrchestration(unittest.TestCase):
self.assertIn("git push origin develop", git_calls)
self.assertIn("git push origin --tags", git_calls)
@patch("pkgmgr.release.sys.stdin.isatty", return_value=False)
@patch("pkgmgr.release._run_git_command")
@patch("pkgmgr.release.update_debian_changelog")
@patch("pkgmgr.release.update_spec_version")
@@ -471,6 +474,7 @@ class TestReleaseOrchestration(unittest.TestCase):
mock_update_spec,
mock_update_debian_changelog,
mock_run_git_command,
mock_isatty,
) -> None:
mock_determine_current_version.return_value = SemVer(1, 2, 3)
mock_bump_semver.return_value = SemVer(1, 2, 4)