Compare commits

...

6 Commits

Author SHA1 Message Date
Kevin Veen-Birkenbach
7d73007181 Release version 0.5.1 2025-12-09 01:21:31 +01:00
Kevin Veen-Birkenbach
c8462fefa4 Release version 0.5.0 2025-12-09 00:44:16 +01:00
Kevin Veen-Birkenbach
00a1f373ce Merge branch 'feature/config_v2.0' 2025-12-09 00:29:19 +01:00
Kevin Veen-Birkenbach
9f9f2e68c0 Release version 0.4.3 2025-12-09 00:29:08 +01:00
Kevin Veen-Birkenbach
d25dcb05e4 Merge branch 'feature/branch_close' 2025-12-09 00:03:56 +01:00
Kevin Veen-Birkenbach
8ea7ff23e9 Release version 0.3.0 2025-12-08 22:40:50 +01:00
22 changed files with 1848 additions and 362 deletions

View File

@@ -1,3 +1,18 @@
## [0.5.1] - 2025-12-09
* Refine pkgmgr release CLI close wiring and integration tests for --close flag (ChatGPT: https://chatgpt.com/share/69376b4e-8440-800f-9d06-535ec1d7a40e)
## [0.5.0] - 2025-12-09
* Add pkgmgr branch close subcommand, extend CLI parser wiring, and add unit tests for branch handling and version version-selection logic (see ChatGPT conversation: https://chatgpt.com/share/693762a3-9ea8-800f-a640-bc78170953d1)
## [0.4.3] - 2025-12-09
* Implement current-directory repository selection for release and proxy commands, unify selection semantics across CLI layers, extend release workflow with --close, integrate branch closing logic, fix wiring for get_repo_identifier/get_repo_dir, update packaging files (PKGBUILD, spec, flake.nix, pyproject), and add comprehensive unit/e2e tests for release and branch commands (see ChatGPT conversation: https://chatgpt.com/share/69375cfe-9e00-800f-bd65-1bd5937e1696)
## [0.4.2] - 2025-12-09
* Wire pkgmgr release CLI to new helper and add unit tests (see ChatGPT conversation: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62)
@@ -12,6 +27,17 @@
* Add branch closing helper and --close flag to release command, including CLI wiring and tests (see https://chatgpt.com/share/69374aec-74ec-800f-bde3-5d91dfdb9b91)
## [0.3.0] - 2025-12-08
* Massive refactor and feature expansion:
- Complete rewrite of config loading system (layered defaults + user config)
- New selection engine (--string, --category, --tag)
- Overhauled list output (colored statuses, alias highlight)
- New config update logic + default YAML sync
- Improved proxy command handling
- Full CLI routing refactor
- Expanded E2E tests for list, proxy, and selection logic
Konversation: https://chatgpt.com/share/693745c3-b8d8-800f-aa29-c8481a2ffae1
## [0.2.0] - 2025-12-08

View File

@@ -1,7 +1,7 @@
# Maintainer: Kevin Veen-Birkenbach <info@veen.world>
pkgname=package-manager
pkgver=0.4.2
pkgver=0.5.1
pkgrel=1
pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)."
arch=('any')

7
config/wip.yml Normal file
View File

@@ -0,0 +1,7 @@
- account: kevinveenbirkenbach
alias: gkfdrtdtcntr
provider: github.com
repository: federated-to-central-social-network-bridge
verified:
gpg_keys:
- 44D8F11FD62F878E

32
debian/changelog vendored
View File

@@ -1,3 +1,21 @@
package-manager (0.5.1-1) unstable; urgency=medium
* Refine pkgmgr release CLI close wiring and integration tests for --close flag (ChatGPT: https://chatgpt.com/share/69376b4e-8440-800f-9d06-535ec1d7a40e)
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 01:21:31 +0100
package-manager (0.5.0-1) unstable; urgency=medium
* Add pkgmgr branch close subcommand, extend CLI parser wiring, and add unit tests for branch handling and version version-selection logic (see ChatGPT conversation: https://chatgpt.com/share/693762a3-9ea8-800f-a640-bc78170953d1)
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 00:44:16 +0100
package-manager (0.4.3-1) unstable; urgency=medium
* Implement current-directory repository selection for release and proxy commands, unify selection semantics across CLI layers, extend release workflow with --close, integrate branch closing logic, fix wiring for get_repo_identifier/get_repo_dir, update packaging files (PKGBUILD, spec, flake.nix, pyproject), and add comprehensive unit/e2e tests for release and branch commands (see ChatGPT conversation: https://chatgpt.com/share/69375cfe-9e00-800f-bd65-1bd5937e1696)
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 09 Dec 2025 00:29:08 +0100
package-manager (0.4.2-1) unstable; urgency=medium
* Wire pkgmgr release CLI to new helper and add unit tests (see ChatGPT conversation: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62)
@@ -16,6 +34,20 @@ package-manager (0.4.0-1) unstable; urgency=medium
-- Kevin Veen-Birkenbach <kevin@veen.world> Mon, 08 Dec 2025 23:02:43 +0100
package-manager (0.3.0-1) unstable; urgency=medium
* Massive refactor and feature expansion:
- Complete rewrite of config loading system (layered defaults + user config)
- New selection engine (--string, --category, --tag)
- Overhauled list output (colored statuses, alias highlight)
- New config update logic + default YAML sync
- Improved proxy command handling
- Full CLI routing refactor
- Expanded E2E tests for list, proxy, and selection logic
Konversation: https://chatgpt.com/share/693745c3-b8d8-800f-aa29-c8481a2ffae1
-- Kevin Veen-Birkenbach <kevin@veen.world> Mon, 08 Dec 2025 22:40:49 +0100
package-manager (0.2.0-1) unstable; urgency=medium
* Add preview-first release workflow and extended packaging support (see ChatGPT conversation: https://chatgpt.com/share/693722b4-af9c-800f-bccc-8a4036e99630)

View File

@@ -31,7 +31,7 @@
rec {
pkgmgr = pyPkgs.buildPythonApplication {
pname = "package-manager";
version = "0.4.2";
version = "0.5.1";
# Use the git repo as source
src = ./.;

View File

@@ -1,5 +1,5 @@
Name: package-manager
Version: 0.4.2
Version: 0.5.1
Release: 1%{?dist}
Summary: Wrapper that runs Kevin's package-manager via Nix flake

View File

@@ -9,9 +9,9 @@ import sys
from pkgmgr.load_config import load_config
from pkgmgr.cli_core import CLIContext, create_parser, dispatch_command
# Define configuration file paths.
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
USER_CONFIG_PATH = os.path.join(PROJECT_ROOT, "config", "config.yaml")
# User config lives in the home directory:
# ~/.config/pkgmgr/config.yaml
USER_CONFIG_PATH = os.path.expanduser("~/.config/pkgmgr/config.yaml")
DESCRIPTION_TEXT = """\
\033[1;32mPackage Manager 🤖📦\033[0m
@@ -63,20 +63,31 @@ For detailed help on each command, use:
def main() -> None:
# Load merged configuration
"""
Entry point for the pkgmgr CLI.
"""
config_merged = load_config(USER_CONFIG_PATH)
repositories_base_dir = os.path.expanduser(
config_merged["directories"]["repositories"]
# Directories: be robust and provide sane defaults if missing
directories = config_merged.get("directories") or {}
repositories_dir = os.path.expanduser(
directories.get("repositories", "~/Repositories")
)
binaries_dir = os.path.expanduser(
config_merged["directories"]["binaries"]
directories.get("binaries", "~/.local/bin")
)
all_repositories = config_merged["repositories"]
# Ensure the merged config actually contains the resolved directories
config_merged.setdefault("directories", {})
config_merged["directories"]["repositories"] = repositories_dir
config_merged["directories"]["binaries"] = binaries_dir
all_repositories = config_merged.get("repositories", [])
ctx = CLIContext(
config_merged=config_merged,
repositories_base_dir=repositories_base_dir,
repositories_base_dir=repositories_dir,
all_repositories=all_repositories,
binaries_dir=binaries_dir,
user_config_path=USER_CONFIG_PATH,
@@ -85,7 +96,6 @@ def main() -> None:
parser = create_parser(DESCRIPTION_TEXT)
args = parser.parse_args()
# If no subcommand is provided, show help
if not getattr(args, "command", None):
parser.print_help()
return

View File

@@ -1,8 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
import sys
from typing import Any, Dict, List
import shutil
from pathlib import Path
from typing import Any, Dict
import yaml
@@ -17,29 +22,103 @@ from pkgmgr.run_command import run_command
def _load_user_config(user_config_path: str) -> Dict[str, Any]:
"""
Load the user config file, returning a default structure if it does not exist.
Load the user config from ~/.config/pkgmgr/config.yaml
(or whatever ctx.user_config_path is), creating the directory if needed.
"""
if os.path.exists(user_config_path):
with open(user_config_path, "r") as f:
user_config_path_expanded = os.path.expanduser(user_config_path)
cfg_dir = os.path.dirname(user_config_path_expanded)
if cfg_dir and not os.path.isdir(cfg_dir):
os.makedirs(cfg_dir, exist_ok=True)
if os.path.exists(user_config_path_expanded):
with open(user_config_path_expanded, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {"repositories": []}
return {"repositories": []}
def _find_defaults_source_dir() -> str | None:
"""
Find the directory inside the installed pkgmgr package OR the
project root that contains default config files.
Preferred locations (in dieser Reihenfolge):
- <pkg_root>/config_defaults
- <pkg_root>/config
- <project_root>/config_defaults
- <project_root>/config
"""
import pkgmgr # local import to avoid circular deps
pkg_root = Path(pkgmgr.__file__).resolve().parent
project_root = pkg_root.parent
candidates = [
pkg_root / "config_defaults",
pkg_root / "config",
project_root / "config_defaults",
project_root / "config",
]
for cand in candidates:
if cand.is_dir():
return str(cand)
return None
def _update_default_configs(user_config_path: str) -> None:
"""
Copy all default *.yml/*.yaml files from the installed pkgmgr package
into ~/.config/pkgmgr/, overwriting existing ones except the user
config file itself (config.yaml), which is never touched.
"""
source_dir = _find_defaults_source_dir()
if not source_dir:
print(
"[WARN] No config_defaults or config directory found in "
"pkgmgr installation. Nothing to update."
)
return
dest_dir = os.path.dirname(os.path.expanduser(user_config_path))
if not dest_dir:
dest_dir = os.path.expanduser("~/.config/pkgmgr")
os.makedirs(dest_dir, exist_ok=True)
for name in os.listdir(source_dir):
lower = name.lower()
if not (lower.endswith(".yml") or lower.endswith(".yaml")):
continue
if name == "config.yaml":
# Never overwrite the user config template / live config
continue
src = os.path.join(source_dir, name)
dst = os.path.join(dest_dir, name)
shutil.copy2(src, dst)
print(f"[INFO] Updated default config file: {dst}")
def handle_config(args, ctx: CLIContext) -> None:
"""
Handle the 'config' command and its subcommands.
Handle 'pkgmgr config' subcommands.
"""
user_config_path = ctx.user_config_path
# --------------------------------------------------------
# ------------------------------------------------------------
# config show
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "show":
if args.all or (not args.identifiers):
# Full merged config view
show_config([], user_config_path, full_config=True)
else:
selected = resolve_repos(args.identifiers, ctx.all_repositories)
# Show only matching entries from user config
user_config = _load_user_config(user_config_path)
selected = resolve_repos(
args.identifiers,
user_config.get("repositories", []),
)
if selected:
show_config(
selected,
@@ -48,23 +127,23 @@ def handle_config(args, ctx: CLIContext) -> None:
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# config add
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "add":
interactive_add(ctx.config_merged, user_config_path)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# config edit
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "edit":
run_command(f"nano {user_config_path}")
return
# --------------------------------------------------------
# ------------------------------------------------------------
# config init
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "init":
user_config = _load_user_config(user_config_path)
config_init(
@@ -75,14 +154,17 @@ def handle_config(args, ctx: CLIContext) -> None:
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# config delete
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "delete":
user_config = _load_user_config(user_config_path)
if args.all or not args.identifiers:
print("You must specify identifiers to delete.")
print(
"[ERROR] 'config delete' requires explicit identifiers. "
"Use 'config show' to inspect entries."
)
return
to_delete = resolve_repos(
@@ -99,14 +181,17 @@ def handle_config(args, ctx: CLIContext) -> None:
print(f"Deleted {len(to_delete)} entries from user config.")
return
# --------------------------------------------------------
# ------------------------------------------------------------
# config ignore
# --------------------------------------------------------
# ------------------------------------------------------------
if args.subcommand == "ignore":
user_config = _load_user_config(user_config_path)
if args.all or not args.identifiers:
print("You must specify identifiers to modify ignore flag.")
print(
"[ERROR] 'config ignore' requires explicit identifiers. "
"Use 'config show' to inspect entries."
)
return
to_modify = resolve_repos(
@@ -135,6 +220,21 @@ def handle_config(args, ctx: CLIContext) -> None:
save_user_config(user_config, user_config_path)
return
# If we end up here, something is wrong with subcommand routing
# ------------------------------------------------------------
# config update
# ------------------------------------------------------------
if args.subcommand == "update":
"""
Copy default YAML configs from the installed package into the
user's ~/.config/pkgmgr directory.
This will overwrite files with the same name (except config.yaml).
"""
_update_default_configs(user_config_path)
return
# ------------------------------------------------------------
# Unknown subcommand
# ------------------------------------------------------------
print(f"Unknown config subcommand: {args.subcommand}")
sys.exit(2)

View File

@@ -1,3 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import sys
@@ -12,7 +15,7 @@ from pkgmgr.status_repos import status_repos
from pkgmgr.list_repositories import list_repositories
from pkgmgr.run_command import run_command
from pkgmgr.create_repo import create_repo
from pkgmgr.get_selected_repos import get_selected_repos
Repository = Dict[str, Any]
@@ -23,15 +26,12 @@ def handle_repos_command(
selected: List[Repository],
) -> None:
"""
Handle repository-related commands:
- install / update / deinstall / delete / status
- path / shell
- create / list
Handle core repository commands (install/update/deinstall/delete/.../list).
"""
# --------------------------------------------------------
# install / update
# --------------------------------------------------------
# ------------------------------------------------------------
# install
# ------------------------------------------------------------
if args.command == "install":
install_repos(
selected,
@@ -46,6 +46,9 @@ def handle_repos_command(
)
return
# ------------------------------------------------------------
# update
# ------------------------------------------------------------
if args.command == "update":
update_repos(
selected,
@@ -61,9 +64,9 @@ def handle_repos_command(
)
return
# --------------------------------------------------------
# deinstall / delete
# --------------------------------------------------------
# ------------------------------------------------------------
# deinstall
# ------------------------------------------------------------
if args.command == "deinstall":
deinstall_repos(
selected,
@@ -74,6 +77,9 @@ def handle_repos_command(
)
return
# ------------------------------------------------------------
# delete
# ------------------------------------------------------------
if args.command == "delete":
delete_repos(
selected,
@@ -83,9 +89,9 @@ def handle_repos_command(
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# status
# --------------------------------------------------------
# ------------------------------------------------------------
if args.command == "status":
status_repos(
selected,
@@ -98,20 +104,20 @@ def handle_repos_command(
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# path
# --------------------------------------------------------
# ------------------------------------------------------------
if args.command == "path":
for repository in selected:
print(repository["directory"])
return
# --------------------------------------------------------
# ------------------------------------------------------------
# shell
# --------------------------------------------------------
# ------------------------------------------------------------
if args.command == "shell":
if not args.shell_command:
print("No shell command specified.")
print("[ERROR] 'shell' requires a command via -c/--command.")
sys.exit(2)
command_to_run = " ".join(args.shell_command)
for repository in selected:
@@ -125,13 +131,13 @@ def handle_repos_command(
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# create
# --------------------------------------------------------
# ------------------------------------------------------------
if args.command == "create":
if not args.identifiers:
print(
"No identifiers provided. Please specify at least one identifier "
"[ERROR] 'create' requires at least one identifier "
"in the format provider/account/repository."
)
sys.exit(1)
@@ -147,15 +153,19 @@ def handle_repos_command(
)
return
# --------------------------------------------------------
# ------------------------------------------------------------
# list
# --------------------------------------------------------
# ------------------------------------------------------------
if args.command == "list":
list_repositories(
ctx.all_repositories,
selected,
ctx.repositories_base_dir,
ctx.binaries_dir,
search_filter=args.search,
status_filter=args.status,
status_filter=getattr(args, "status", "") or "",
extra_tags=getattr(args, "tag", []) or [],
show_description=getattr(args, "description", False),
)
return
print(f"[ERROR] Unknown repos command: {args.command}")
sys.exit(2)

View File

@@ -1,11 +1,16 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
import sys
from typing import List
from typing import List, Dict, Any
from pkgmgr.cli_core.context import CLIContext
from pkgmgr.cli_core.proxy import maybe_handle_proxy
from pkgmgr.get_selected_repos import get_selected_repos
from pkgmgr.get_repo_dir import get_repo_dir
from pkgmgr.cli_core.commands import (
handle_repos_command,
@@ -19,20 +24,73 @@ from pkgmgr.cli_core.commands import (
)
def _has_explicit_selection(args) -> bool:
"""
Return True if the user explicitly selected repositories via
identifiers / --all / --category / --tag / --string.
"""
identifiers = getattr(args, "identifiers", []) or []
use_all = getattr(args, "all", False)
categories = getattr(args, "category", []) or []
tags = getattr(args, "tag", []) or []
string_filter = getattr(args, "string", "") or ""
return bool(
use_all
or identifiers
or categories
or tags
or string_filter
)
def _select_repo_for_current_directory(
ctx: CLIContext,
) -> List[Dict[str, Any]]:
"""
Heuristic: find the repository whose local directory matches the
current working directory or is the closest parent.
Example:
- Repo directory: /home/kevin/Repositories/foo
- CWD: /home/kevin/Repositories/foo/subdir
'foo' is selected.
"""
cwd = os.path.abspath(os.getcwd())
candidates: List[tuple[str, Dict[str, Any]]] = []
for repo in ctx.all_repositories:
repo_dir = repo.get("directory")
if not repo_dir:
try:
repo_dir = get_repo_dir(ctx.repositories_base_dir, repo)
except Exception:
repo_dir = None
if not repo_dir:
continue
repo_dir_abs = os.path.abspath(os.path.expanduser(repo_dir))
if cwd == repo_dir_abs or cwd.startswith(repo_dir_abs + os.sep):
candidates.append((repo_dir_abs, repo))
if not candidates:
return []
# Pick the repo with the longest (most specific) path.
candidates.sort(key=lambda item: len(item[0]), reverse=True)
return [candidates[0][1]]
def dispatch_command(args, ctx: CLIContext) -> None:
"""
Top-level command dispatcher.
Responsible for:
- computing selected repositories (where applicable)
- delegating to the correct command handler module
Dispatch the parsed arguments to the appropriate command handler.
"""
# 1) Proxy commands (git, docker, docker compose) short-circuit.
# First: proxy commands (git / docker / docker compose / make wrapper etc.)
if maybe_handle_proxy(args, ctx):
return
# 2) Determine if this command uses repository selection.
# Commands that operate on repository selections
commands_with_selection: List[str] = [
"install",
"update",
@@ -41,26 +99,33 @@ def dispatch_command(args, ctx: CLIContext) -> None:
"status",
"path",
"shell",
"code",
"explore",
"terminal",
"create",
"list",
"make",
"release",
"version",
"make",
"changelog",
# intentionally NOT "branch" it operates on cwd only
"explore",
"terminal",
"code",
]
if args.command in commands_with_selection:
selected = get_selected_repos(
getattr(args, "all", False),
ctx.all_repositories,
getattr(args, "identifiers", []),
)
if getattr(args, "command", None) in commands_with_selection:
if _has_explicit_selection(args):
# Classic selection logic (identifiers / --all / filters)
selected = get_selected_repos(args, ctx.all_repositories)
else:
# Default per help text: repository of current folder.
selected = _select_repo_for_current_directory(ctx)
# If none is found, leave 'selected' empty.
# Individual handlers will then emit a clear message instead
# of silently picking an unrelated repository.
else:
selected = []
# 3) Delegate based on command.
# ------------------------------------------------------------------ #
# Repos-related commands
# ------------------------------------------------------------------ #
if args.command in (
"install",
"update",
@@ -73,22 +138,41 @@ def dispatch_command(args, ctx: CLIContext) -> None:
"list",
):
handle_repos_command(args, ctx, selected)
elif args.command in ("code", "explore", "terminal"):
return
# ------------------------------------------------------------------ #
# Tools (explore / terminal / code)
# ------------------------------------------------------------------ #
if args.command in ("explore", "terminal", "code"):
handle_tools_command(args, ctx, selected)
elif args.command == "release":
return
# ------------------------------------------------------------------ #
# Release / Version / Changelog / Config / Make / Branch
# ------------------------------------------------------------------ #
if args.command == "release":
handle_release(args, ctx, selected)
elif args.command == "version":
return
if args.command == "version":
handle_version(args, ctx, selected)
elif args.command == "changelog":
return
if args.command == "changelog":
handle_changelog(args, ctx, selected)
elif args.command == "config":
return
if args.command == "config":
handle_config(args, ctx)
elif args.command == "make":
return
if args.command == "make":
handle_make(args, ctx, selected)
elif args.command == "branch":
# Branch commands currently operate on the current working
# directory only, not on the pkgmgr repository selection.
return
if args.command == "branch":
handle_branch(args, ctx)
else:
print(f"Unknown command: {args.command}")
sys.exit(2)
return
print(f"Unknown command: {args.command}")
sys.exit(2)

View File

@@ -1,3 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
@@ -12,13 +15,20 @@ class SortedSubParsersAction(argparse._SubParsersAction):
def add_parser(self, name, **kwargs):
parser = super().add_parser(name, **kwargs)
# Sort choices alphabetically by dest (subcommand name)
self._choices_actions.sort(key=lambda a: a.dest)
return parser
def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
"""
Attach generic repository selection arguments to a subparser.
Common identifier / selection arguments for many subcommands.
Selection modes (mutual intent, not hard-enforced):
- identifiers (positional): select by alias / provider/account/repo
- --all: select all repositories
- --category / --string / --tag: filter-based selection on top
of the full repository set
"""
subparser.add_argument(
"identifiers",
@@ -39,6 +49,33 @@ def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
"yes | pkgmgr {subcommand} --all"
),
)
subparser.add_argument(
"--category",
nargs="+",
default=[],
help=(
"Filter repositories by category patterns derived from config "
"filenames or repo metadata (use filename without .yml/.yaml, "
"or /regex/ to use a regular expression)."
),
)
subparser.add_argument(
"--string",
default="",
help=(
"Filter repositories whose identifier / name / path contains this "
"substring (case-insensitive). Use /regex/ for regular expressions."
),
)
subparser.add_argument(
"--tag",
action="append",
default=[],
help=(
"Filter repositories by tag. Matches tags from the repository "
"collector and category tags. Use /regex/ for regular expressions."
),
)
subparser.add_argument(
"--preview",
action="store_true",
@@ -61,7 +98,7 @@ def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None:
"""
Attach shared flags for install/update-like commands.
Common arguments for install/update commands.
"""
add_identifier_arguments(subparser)
subparser.add_argument(
@@ -94,10 +131,7 @@ def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None:
def create_parser(description_text: str) -> argparse.ArgumentParser:
"""
Create and configure the top-level argument parser for pkgmgr.
This function defines *only* the CLI surface (arguments & subcommands),
but no business logic.
Create the top-level argument parser for pkgmgr.
"""
parser = argparse.ArgumentParser(
description=description_text,
@@ -110,7 +144,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
)
# ------------------------------------------------------------
# install / update
# install / update / deinstall / delete
# ------------------------------------------------------------
install_parser = subparsers.add_parser(
"install",
@@ -129,9 +163,6 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
help="Include system update commands",
)
# ------------------------------------------------------------
# deinstall / delete
# ------------------------------------------------------------
deinstall_parser = subparsers.add_parser(
"deinstall",
help="Remove alias links to repository/repositories",
@@ -147,7 +178,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
# ------------------------------------------------------------
# create
# ------------------------------------------------------------
create_parser = subparsers.add_parser(
create_cmd_parser = subparsers.add_parser(
"create",
help=(
"Create new repository entries: add them to the config if not "
@@ -155,8 +186,8 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"remotely if --remote is set."
),
)
add_identifier_arguments(create_parser)
create_parser.add_argument(
add_identifier_arguments(create_cmd_parser)
create_cmd_parser.add_argument(
"--remote",
action="store_true",
help="If set, add the remote and push the initial commit.",
@@ -228,6 +259,14 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
help="Set ignore to true or false",
)
config_subparsers.add_parser(
"update",
help=(
"Update default config files in ~/.config/pkgmgr/ from the "
"installed pkgmgr package (does not touch config.yaml)."
),
)
# ------------------------------------------------------------
# path / explore / terminal / code / shell
# ------------------------------------------------------------
@@ -265,7 +304,10 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"--command",
nargs=argparse.REMAINDER,
dest="shell_command",
help="The shell command (and its arguments) to execute in each repository",
help=(
"The shell command (and its arguments) to execute in each "
"repository"
),
default=[],
)
@@ -274,7 +316,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
# ------------------------------------------------------------
branch_parser = subparsers.add_parser(
"branch",
help="Branch-related utilities (e.g. open feature branches)",
help="Branch-related utilities (e.g. open/close feature branches)",
)
branch_subparsers = branch_parser.add_subparsers(
dest="subcommand",
@@ -289,7 +331,10 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
branch_open.add_argument(
"name",
nargs="?",
help="Name of the new branch (optional; will be asked interactively if omitted)",
help=(
"Name of the new branch (optional; will be asked interactively "
"if omitted)"
),
)
branch_open.add_argument(
"--base",
@@ -297,6 +342,27 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
help="Base branch to create the new branch from (default: main)",
)
branch_close = branch_subparsers.add_parser(
"close",
help="Merge a feature branch into base and delete it",
)
branch_close.add_argument(
"name",
nargs="?",
help=(
"Name of the branch to close (optional; current branch is used "
"if omitted)"
),
)
branch_close.add_argument(
"--base",
default="main",
help=(
"Base branch to merge into (default: main; falls back to master "
"internally if main does not exist)"
),
)
# ------------------------------------------------------------
# release
# ------------------------------------------------------------
@@ -349,7 +415,8 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"version",
help=(
"Show version information for repository/ies "
"(git tags, pyproject.toml, flake.nix, PKGBUILD, debian, spec, Ansible Galaxy)."
"(git tags, pyproject.toml, flake.nix, PKGBUILD, debian, spec, "
"Ansible Galaxy)."
),
)
add_identifier_arguments(version_parser)
@@ -383,20 +450,29 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
"list",
help="List all repositories with details and status",
)
list_parser.add_argument(
"--search",
default="",
help="Filter repositories that contain the given string",
)
# dieselbe Selektionslogik wie bei install/update/etc.:
add_identifier_arguments(list_parser)
list_parser.add_argument(
"--status",
type=str,
default="",
help="Filter repositories by status (case insensitive)",
help=(
"Filter repositories by status (case insensitive). "
"Use /regex/ for regular expressions."
),
)
list_parser.add_argument(
"--description",
action="store_true",
help=(
"Show an additional detailed section per repository "
"(description, homepage, tags, categories, paths)."
),
)
# ------------------------------------------------------------
# make (wrapper around make in repositories)
# make
# ------------------------------------------------------------
make_parser = subparsers.add_parser(
"make",
@@ -422,7 +498,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
add_identifier_arguments(make_deinstall)
# ------------------------------------------------------------
# Proxy commands (git, docker, docker compose)
# Proxy commands (git, docker, docker compose, ...)
# ------------------------------------------------------------
register_proxy_commands(subparsers)

View File

@@ -1,14 +1,19 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
import os
import sys
from typing import Dict, List
from typing import Dict, List, Any
from pkgmgr.cli_core.context import CLIContext
from pkgmgr.clone_repos import clone_repos
from pkgmgr.exec_proxy_command import exec_proxy_command
from pkgmgr.get_selected_repos import get_selected_repos
from pkgmgr.pull_with_verification import pull_with_verification
from pkgmgr.get_selected_repos import get_selected_repos
from pkgmgr.get_repo_dir import get_repo_dir
PROXY_COMMANDS: Dict[str, List[str]] = {
@@ -42,10 +47,7 @@ PROXY_COMMANDS: Dict[str, List[str]] = {
def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None:
"""
Local copy of the identifier argument set for proxy commands.
This duplicates the semantics of cli.parser.add_identifier_arguments
to avoid circular imports.
Selection arguments for proxy subcommands.
"""
parser.add_argument(
"identifiers",
@@ -66,6 +68,24 @@ def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None:
"yes | pkgmgr {subcommand} --all"
),
)
parser.add_argument(
"--category",
nargs="+",
default=[],
help=(
"Filter repositories by category patterns derived from config "
"filenames or repo metadata (use filename without .yml/.yaml, "
"or /regex/ to use a regular expression)."
),
)
parser.add_argument(
"--string",
default="",
help=(
"Filter repositories whose identifier / name / path contains this "
"substring (case-insensitive). Use /regex/ for regular expressions."
),
)
parser.add_argument(
"--preview",
action="store_true",
@@ -86,12 +106,62 @@ def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None:
)
def _proxy_has_explicit_selection(args: argparse.Namespace) -> bool:
"""
Same semantics as in the main dispatch:
True if the user explicitly selected repositories.
"""
identifiers = getattr(args, "identifiers", []) or []
use_all = getattr(args, "all", False)
categories = getattr(args, "category", []) or []
string_filter = getattr(args, "string", "") or ""
# Proxy commands currently do not support --tag, so it is not checked here.
return bool(
use_all
or identifiers
or categories
or string_filter
)
def _select_repo_for_current_directory(
ctx: CLIContext,
) -> List[Dict[str, Any]]:
"""
Heuristic: find the repository whose local directory matches the
current working directory or is the closest parent.
"""
cwd = os.path.abspath(os.getcwd())
candidates: List[tuple[str, Dict[str, Any]]] = []
for repo in ctx.all_repositories:
repo_dir = repo.get("directory")
if not repo_dir:
try:
repo_dir = get_repo_dir(ctx.repositories_base_dir, repo)
except Exception:
repo_dir = None
if not repo_dir:
continue
repo_dir_abs = os.path.abspath(os.path.expanduser(repo_dir))
if cwd == repo_dir_abs or cwd.startswith(repo_dir_abs + os.sep):
candidates.append((repo_dir_abs, repo))
if not candidates:
return []
# Pick the repo with the longest (most specific) path.
candidates.sort(key=lambda item: len(item[0]), reverse=True)
return [candidates[0][1]]
def register_proxy_commands(
subparsers: argparse._SubParsersAction,
) -> None:
"""
Register proxy commands (git, docker, docker compose) as
top-level subcommands on the given subparsers.
Register proxy subcommands for git, docker, docker compose, ...
"""
for command, subcommands in PROXY_COMMANDS.items():
for subcommand in subcommands:
@@ -100,7 +170,8 @@ def register_proxy_commands(
help=f"Proxies '{command} {subcommand}' to repository/ies",
description=(
f"Executes '{command} {subcommand}' for the "
"identified repos.\nTo recieve more help execute "
"selected repositories. "
"For more details see the underlying tool's help: "
f"'{command} {subcommand} --help'"
),
formatter_class=argparse.RawTextHelpFormatter,
@@ -129,8 +200,8 @@ def register_proxy_commands(
def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool:
"""
If the parsed command is a proxy command, execute it and return True.
Otherwise return False to let the main dispatcher continue.
If the top-level command is one of the proxy subcommands
(git / docker / docker compose), handle it here and return True.
"""
all_proxy_subcommands = {
sub for subs in PROXY_COMMANDS.values() for sub in subs
@@ -139,12 +210,17 @@ def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool:
if args.command not in all_proxy_subcommands:
return False
# Use generic selection semantics for proxies
selected = get_selected_repos(
getattr(args, "all", False),
ctx.all_repositories,
getattr(args, "identifiers", []),
)
# Default semantics: without explicit selection → repo of current folder.
if _proxy_has_explicit_selection(args):
selected = get_selected_repos(args, ctx.all_repositories)
else:
selected = _select_repo_for_current_directory(ctx)
if not selected:
print(
"[ERROR] No repository matches the current directory. "
"Specify identifiers or use --all/--category/--string."
)
sys.exit(1)
for command, subcommands in PROXY_COMMANDS.items():
if args.command not in subcommands:

View File

@@ -1,46 +1,122 @@
import subprocess
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Initialize user configuration by scanning the repositories base directory.
This module scans the path:
defaults_config["directories"]["repositories"]
with the expected structure:
{base}/{provider}/{account}/{repository}
For each discovered repository, the function:
• derives provider, account, repository from the folder structure
• (optionally) determines the latest commit hash via git log
• generates a unique CLI alias
• marks ignore=True for newly discovered repos
• skips repos already known in defaults or user config
"""
from __future__ import annotations
import os
import subprocess
from typing import Any, Dict
from pkgmgr.generate_alias import generate_alias
from pkgmgr.save_user_config import save_user_config
def config_init(user_config, defaults_config, bin_dir,USER_CONFIG_PATH:str):
def config_init(
user_config: Dict[str, Any],
defaults_config: Dict[str, Any],
bin_dir: str,
user_config_path: str,
) -> None:
"""
Scan the base directory (defaults_config["base"]) for repositories.
The folder structure is assumed to be:
{base}/{provider}/{account}/{repository}
For each repository found, automatically determine:
- provider, account, repository from folder names.
- verified: the latest commit (via 'git log -1 --format=%H').
- alias: generated from the repository name using generate_alias().
Repositories already defined in defaults_config["repositories"] or user_config["repositories"] are skipped.
Scan the repositories base directory and add missing entries
to the user configuration.
"""
repositories_base_dir = os.path.expanduser(defaults_config["directories"]["repositories"])
# ------------------------------------------------------------
# Announce where we will write the result
# ------------------------------------------------------------
print("============================================================")
print(f"[INIT] Writing user configuration to:")
print(f" {user_config_path}")
print("============================================================")
repositories_base_dir = os.path.expanduser(
defaults_config["directories"]["repositories"]
)
print(f"[INIT] Scanning repository base directory:")
print(f" {repositories_base_dir}")
print("")
if not os.path.isdir(repositories_base_dir):
print(f"Base directory '{repositories_base_dir}' does not exist.")
print(f"[ERROR] Base directory does not exist: {repositories_base_dir}")
return
default_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in defaults_config.get("repositories", [])}
existing_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in user_config.get("repositories", [])}
existing_aliases = {entry.get("alias") for entry in user_config.get("repositories", []) if entry.get("alias")}
default_keys = {
(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in defaults_config.get("repositories", [])
}
existing_keys = {
(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in user_config.get("repositories", [])
}
existing_aliases = {
entry.get("alias")
for entry in user_config.get("repositories", [])
if entry.get("alias")
}
new_entries = []
scanned = 0
skipped = 0
# ------------------------------------------------------------
# Actual scanning
# ------------------------------------------------------------
for provider in os.listdir(repositories_base_dir):
provider_path = os.path.join(repositories_base_dir, provider)
if not os.path.isdir(provider_path):
continue
print(f"[SCAN] Provider: {provider}")
for account in os.listdir(provider_path):
account_path = os.path.join(provider_path, account)
if not os.path.isdir(account_path):
continue
print(f"[SCAN] Account: {account}")
for repo_name in os.listdir(account_path):
repo_path = os.path.join(account_path, repo_name)
if not os.path.isdir(repo_path):
continue
scanned += 1
key = (provider, account, repo_name)
if key in default_keys or key in existing_keys:
# Already known?
if key in default_keys:
skipped += 1
print(f"[SKIP] (defaults) {provider}/{account}/{repo_name}")
continue
if key in existing_keys:
skipped += 1
print(f"[SKIP] (user-config) {provider}/{account}/{repo_name}")
continue
print(f"[ADD] {provider}/{account}/{repo_name}")
# Determine commit hash
try:
result = subprocess.run(
["git", "log", "-1", "--format=%H"],
@@ -51,25 +127,55 @@ def config_init(user_config, defaults_config, bin_dir,USER_CONFIG_PATH:str):
check=True,
)
verified = result.stdout.strip()
except Exception as e:
print(f"[INFO] Latest commit: {verified}")
except Exception as exc:
verified = ""
print(f"Could not determine latest commit for {repo_name} ({provider}/{account}): {e}")
print(f"[WARN] Could not read commit: {exc}")
entry = {
"provider": provider,
"account": account,
"repository": repo_name,
"verified": {"commit": verified},
"ignore": True
"ignore": True,
}
alias = generate_alias({"repository": repo_name, "provider": provider, "account": account}, bin_dir, existing_aliases)
# Alias generation
alias = generate_alias(
{
"repository": repo_name,
"provider": provider,
"account": account,
},
bin_dir,
existing_aliases,
)
entry["alias"] = alias
existing_aliases.add(alias)
new_entries.append(entry)
print(f"Adding new repo entry: {entry}")
print(f"[INFO] Alias generated: {alias}")
new_entries.append(entry)
print("") # blank line between accounts
# ------------------------------------------------------------
# Summary
# ------------------------------------------------------------
print("============================================================")
print(f"[DONE] Scanned repositories: {scanned}")
print(f"[DONE] Skipped (known): {skipped}")
print(f"[DONE] New entries discovered: {len(new_entries)}")
print("============================================================")
# ------------------------------------------------------------
# Save if needed
# ------------------------------------------------------------
if new_entries:
user_config.setdefault("repositories", []).extend(new_entries)
save_user_config(user_config,USER_CONFIG_PATH)
save_user_config(user_config, user_config_path)
print(f"[SAVE] Wrote user configuration to:")
print(f" {user_config_path}")
else:
print("No new repositories found.")
print("[INFO] No new repositories were added.")
print("============================================================")

View File

@@ -1,29 +1,170 @@
import os
import sys
from .resolve_repos import resolve_repos
from .filter_ignored import filter_ignored
from .get_repo_dir import get_repo_dir
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
import re
from typing import Any, Dict, List, Sequence
from pkgmgr.resolve_repos import resolve_repos
Repository = Dict[str, Any]
def _compile_maybe_regex(pattern: str):
"""
If pattern is of the form /.../, return a compiled regex (case-insensitive).
Otherwise return None.
"""
if len(pattern) >= 2 and pattern.startswith("/") and pattern.endswith("/"):
try:
return re.compile(pattern[1:-1], re.IGNORECASE)
except re.error:
return None
return None
def _match_pattern(value: str, pattern: str) -> bool:
"""
Match a value against a pattern that may be a substring or /regex/.
"""
if not pattern:
return True
regex = _compile_maybe_regex(pattern)
if regex:
return bool(regex.search(value))
return pattern.lower() in value.lower()
def _match_any(values: Sequence[str], pattern: str) -> bool:
"""
Return True if any of the values matches the pattern.
"""
for v in values:
if _match_pattern(v, pattern):
return True
return False
def _build_identifier_string(repo: Repository) -> str:
"""
Build a combined identifier string for string-based filtering.
"""
provider = str(repo.get("provider", ""))
account = str(repo.get("account", ""))
repository = str(repo.get("repository", ""))
alias = str(repo.get("alias", ""))
description = str(repo.get("description", ""))
directory = str(repo.get("directory", ""))
parts = [
provider,
account,
repository,
alias,
f"{provider}/{account}/{repository}",
description,
directory,
]
return " ".join(p for p in parts if p)
def _apply_filters(
repos: List[Repository],
string_pattern: str,
category_patterns: List[str],
tag_patterns: List[str],
) -> List[Repository]:
if not string_pattern and not category_patterns and not tag_patterns:
return repos
filtered: List[Repository] = []
for repo in repos:
# String filter
if string_pattern:
ident_str = _build_identifier_string(repo)
if not _match_pattern(ident_str, string_pattern):
continue
# Category filter: nur echte Kategorien, KEINE Tags
if category_patterns:
cats: List[str] = []
cats.extend(map(str, repo.get("category_files", [])))
if "category" in repo:
cats.append(str(repo["category"]))
if not cats:
continue
ok = True
for pat in category_patterns:
if not _match_any(cats, pat):
ok = False
break
if not ok:
continue
# Tag filter: ausschließlich YAML-Tags
if tag_patterns:
tags: List[str] = list(map(str, repo.get("tags", [])))
if not tags:
continue
ok = True
for pat in tag_patterns:
if not _match_any(tags, pat):
ok = False
break
if not ok:
continue
filtered.append(repo)
def get_selected_repos(show_all: bool, all_repos_list, identifiers=None):
if show_all:
selected = all_repos_list
else:
selected = resolve_repos(identifiers, all_repos_list)
# If no repositories were found using the provided identifiers,
# try to automatically select based on the current directory:
if not selected:
current_dir = os.getcwd()
directory_name = os.path.basename(current_dir)
# Pack the directory name in a list since resolve_repos expects a list.
auto_selected = resolve_repos([directory_name], all_repos_list)
if auto_selected:
# Check if the path of the first auto-selected repository matches the current directory.
if os.path.abspath(auto_selected[0].get("directory")) == os.path.abspath(current_dir):
print(f"Repository {auto_selected[0]['repository']} has been auto-selected by path.")
selected = auto_selected
filtered = filter_ignored(selected)
if not filtered:
print("Error: No repositories had been selected.")
sys.exit(4)
return filtered
def get_selected_repos(args, all_repositories: List[Repository]) -> List[Repository]:
"""
Compute the list of repositories selected by CLI arguments.
Modes:
- If identifiers are given: select via resolve_repos() from all_repositories.
- Else if any of --category/--string/--tag is used: start from all_repositories
and apply filters.
- Else if --all is set: select all_repositories.
- Else: try to select the repository of the current working directory.
"""
identifiers: List[str] = getattr(args, "identifiers", []) or []
use_all: bool = bool(getattr(args, "all", False))
category_patterns: List[str] = getattr(args, "category", []) or []
string_pattern: str = getattr(args, "string", "") or ""
tag_patterns: List[str] = getattr(args, "tag", []) or []
has_filters = bool(category_patterns or string_pattern or tag_patterns)
# 1) Explicit identifiers win
if identifiers:
base = resolve_repos(identifiers, all_repositories)
return _apply_filters(base, string_pattern, category_patterns, tag_patterns)
# 2) Filter-only mode: start from all repositories
if has_filters:
return _apply_filters(list(all_repositories), string_pattern, category_patterns, tag_patterns)
# 3) --all (no filters): all repos
if use_all:
return list(all_repositories)
# 4) Fallback: try to select repository of current working directory
cwd = os.path.abspath(os.getcwd())
by_dir = [
repo
for repo in all_repositories
if os.path.abspath(str(repo.get("directory", ""))) == cwd
]
if by_dir:
return by_dir
# No specific match -> empty list
return []

View File

@@ -1,108 +1,352 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Pretty-print repository list with status, categories, tags and path.
- Tags come exclusively from YAML: repo["tags"].
- Categories come from repo["category_files"] (YAML file names without
.yml/.yaml) and optional repo["category"].
- Optional detail mode (--description) prints an extended section per
repository with description, homepage, etc.
"""
from __future__ import annotations
import os
from pkgmgr.get_repo_identifier import get_repo_identifier
from pkgmgr.get_repo_dir import get_repo_dir
import re
from textwrap import wrap
from typing import Any, Dict, List, Optional
def list_repositories(all_repos, repositories_base_dir, bin_dir, search_filter="", status_filter=""):
Repository = Dict[str, Any]
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
MAGENTA = "\033[35m"
GREY = "\033[90m"
def _compile_maybe_regex(pattern: str) -> Optional[re.Pattern[str]]:
"""
Lists all repositories with their attributes and status information.
The repositories are sorted in ascending order by their identifier.
Parameters:
all_repos (list): List of repository configurations.
repositories_base_dir (str): The base directory where repositories are located.
bin_dir (str): The directory where executable wrappers are stored.
search_filter (str): Filter for repository attributes (case insensitive).
status_filter (str): Filter for computed status info (case insensitive).
For each repository, the identifier is printed in bold, the description (if available)
in italic, then all other attributes and computed status are printed.
If the repository is installed, a hint is displayed under the attributes.
Repositories are filtered out if either the search_filter is not found in any attribute or
if the status_filter is not found in the computed status string.
If pattern is of the form /.../, return a compiled regex (case-insensitive).
Otherwise return None.
"""
search_filter = search_filter.lower() if search_filter else ""
status_filter = status_filter.lower() if status_filter else ""
if not pattern:
return None
if len(pattern) >= 2 and pattern.startswith("/") and pattern.endswith("/"):
try:
return re.compile(pattern[1:-1], re.IGNORECASE)
except re.error:
return None
return None
# Define status colors using colors not used for other attributes:
# Avoid red (for ignore), blue (for homepage) and yellow (for verified).
status_colors = {
"Installed": "\033[1;32m", # Green
"Not Installed": "\033[1;35m", # Magenta
"Cloned": "\033[1;36m", # Cyan
"Clonable": "\033[1;37m", # White
"Ignored": "\033[38;5;208m", # Orange (extended)
"Active": "\033[38;5;129m", # Light Purple (extended)
"Installable": "\033[38;5;82m" # Light Green (extended)
}
# Sort all repositories by their identifier in ascending order.
sorted_repos = sorted(all_repos, key=lambda repo: get_repo_identifier(repo, all_repos))
def _status_matches(status: str, status_filter: str) -> bool:
"""
Match a status string against an optional filter (substring or /regex/).
"""
if not status_filter:
return True
for repo in sorted_repos:
# Combine all attribute values into one string for filtering.
repo_text = " ".join(str(v) for v in repo.values()).lower()
if search_filter and search_filter not in repo_text:
regex = _compile_maybe_regex(status_filter)
if regex:
return bool(regex.search(status))
return status_filter.lower() in status.lower()
def _compute_repo_dir(repositories_base_dir: str, repo: Repository) -> str:
"""
Compute the local directory for a repository.
If the repository already has a 'directory' key, that is used;
otherwise the path is constructed from provider/account/repository
under repositories_base_dir.
"""
if repo.get("directory"):
return os.path.expanduser(str(repo["directory"]))
provider = str(repo.get("provider", ""))
account = str(repo.get("account", ""))
repository = str(repo.get("repository", ""))
return os.path.join(
os.path.expanduser(repositories_base_dir),
provider,
account,
repository,
)
def _compute_status(
repo: Repository,
repo_dir: str,
binaries_dir: str,
) -> str:
"""
Compute a human-readable status string, e.g. 'present,alias,ignored'.
"""
parts: List[str] = []
exists = os.path.isdir(repo_dir)
if exists:
parts.append("present")
else:
parts.append("absent")
alias = repo.get("alias")
if alias:
alias_path = os.path.join(os.path.expanduser(binaries_dir), str(alias))
if os.path.exists(alias_path):
parts.append("alias")
else:
parts.append("alias-missing")
if repo.get("ignore"):
parts.append("ignored")
return ",".join(parts) if parts else "-"
def _color_status(status_padded: str) -> str:
"""
Color individual status flags inside a padded status string.
Input is expected to be right-padded to the column width.
Color mapping:
- present -> green
- absent -> red
- alias -> red
- alias-missing -> red
- ignored -> magenta
- other -> default
"""
core = status_padded.rstrip()
pad_spaces = len(status_padded) - len(core)
plain_parts = core.split(",") if core else []
colored_parts: List[str] = []
for raw_part in plain_parts:
name = raw_part.strip()
if not name:
continue
# Compute status information for the repository.
identifier = get_repo_identifier(repo, all_repos)
executable_path = os.path.join(bin_dir, identifier)
repo_dir = get_repo_dir(repositories_base_dir, repo)
status_list = []
# Check if the executable exists (Installed).
if os.path.exists(executable_path):
status_list.append("Installed")
if name == "present":
color = GREEN
elif name == "absent":
color = MAGENTA
elif name in ("alias", "alias-missing"):
color = YELLOW
elif name == "ignored":
color = MAGENTA
else:
status_list.append("Not Installed")
# Check if the repository directory exists (Cloned).
if os.path.exists(repo_dir):
status_list.append("Cloned")
else:
status_list.append("Clonable")
# Mark ignored repositories.
if repo.get("ignore", False):
status_list.append("Ignored")
else:
status_list.append("Active")
# Define installable as cloned but not installed.
if os.path.exists(repo_dir) and not os.path.exists(executable_path):
status_list.append("Installable")
color = ""
# Build a colored status string.
colored_statuses = [f"{status_colors.get(s, '')}{s}\033[0m" for s in status_list]
status_str = ", ".join(colored_statuses)
if color:
colored_parts.append(f"{color}{name}{RESET}")
else:
colored_parts.append(name)
# If a status_filter is provided, only display repos whose status contains the filter.
if status_filter and status_filter not in status_str.lower():
colored_core = ",".join(colored_parts)
return colored_core + (" " * pad_spaces)
def list_repositories(
repositories: List[Repository],
repositories_base_dir: str,
binaries_dir: str,
search_filter: str = "",
status_filter: str = "",
extra_tags: Optional[List[str]] = None,
show_description: bool = False,
) -> None:
"""
Print a table of repositories and (optionally) detailed descriptions.
Parameters
----------
repositories:
Repositories to show (usually already filtered by get_selected_repos).
repositories_base_dir:
Base directory where repositories live.
binaries_dir:
Directory where alias symlinks live.
search_filter:
Optional substring/regex filter on identifier and metadata.
status_filter:
Optional filter on computed status.
extra_tags:
Additional tags to show for each repository (CLI overlay only).
show_description:
If True, print a detailed block for each repository after the table.
"""
if extra_tags is None:
extra_tags = []
search_regex = _compile_maybe_regex(search_filter)
rows: List[Dict[str, Any]] = []
# ------------------------------------------------------------------
# Build rows
# ------------------------------------------------------------------
for repo in repositories:
identifier = str(repo.get("repository") or repo.get("alias") or "")
alias = str(repo.get("alias") or "")
provider = str(repo.get("provider") or "")
account = str(repo.get("account") or "")
description = str(repo.get("description") or "")
homepage = str(repo.get("homepage") or "")
repo_dir = _compute_repo_dir(repositories_base_dir, repo)
status = _compute_status(repo, repo_dir, binaries_dir)
if not _status_matches(status, status_filter):
continue
# Display repository details:
# Print the identifier in bold.
print(f"\033[1m{identifier}\033[0m")
# Print the description in italic if it exists.
description = repo.get("description")
if search_filter:
haystack = " ".join(
[
identifier,
alias,
provider,
account,
description,
homepage,
repo_dir,
]
)
if search_regex:
if not search_regex.search(haystack):
continue
else:
if search_filter.lower() not in haystack.lower():
continue
categories: List[str] = []
categories.extend(map(str, repo.get("category_files", [])))
if repo.get("category"):
categories.append(str(repo["category"]))
yaml_tags: List[str] = list(map(str, repo.get("tags", [])))
display_tags: List[str] = sorted(
set(yaml_tags + list(map(str, extra_tags)))
)
rows.append(
{
"repo": repo,
"identifier": identifier,
"status": status,
"categories": categories,
"tags": display_tags,
"dir": repo_dir,
}
)
if not rows:
print("No repositories matched the given filters.")
return
# ------------------------------------------------------------------
# Table section (header grey, values white, per-flag colored status)
# ------------------------------------------------------------------
ident_width = max(len("IDENTIFIER"), max(len(r["identifier"]) for r in rows))
status_width = max(len("STATUS"), max(len(r["status"]) for r in rows))
cat_width = max(
len("CATEGORIES"),
max((len(",".join(r["categories"])) for r in rows), default=0),
)
tag_width = max(
len("TAGS"),
max((len(",".join(r["tags"])) for r in rows), default=0),
)
header = (
f"{GREY}{BOLD}"
f"{'IDENTIFIER'.ljust(ident_width)} "
f"{'STATUS'.ljust(status_width)} "
f"{'CATEGORIES'.ljust(cat_width)} "
f"{'TAGS'.ljust(tag_width)} "
f"DIR"
f"{RESET}"
)
print(header)
print("-" * (ident_width + status_width + cat_width + tag_width + 10 + 40))
for r in rows:
ident_col = r["identifier"].ljust(ident_width)
cat_col = ",".join(r["categories"]).ljust(cat_width)
tag_col = ",".join(r["tags"]).ljust(tag_width)
dir_col = r["dir"]
status = r["status"]
status_padded = status.ljust(status_width)
status_colored = _color_status(status_padded)
print(
f"{ident_col} "
f"{status_colored} "
f"{cat_col} "
f"{tag_col} "
f"{dir_col}"
)
# ------------------------------------------------------------------
# Detailed section (alias value red, same status coloring)
# ------------------------------------------------------------------
if not show_description:
return
print()
print(f"{BOLD}Detailed repository information:{RESET}")
print()
for r in rows:
repo = r["repo"]
identifier = r["identifier"]
alias = str(repo.get("alias") or "")
provider = str(repo.get("provider") or "")
account = str(repo.get("account") or "")
repository = str(repo.get("repository") or "")
description = str(repo.get("description") or "")
homepage = str(repo.get("homepage") or "")
categories = r["categories"]
tags = r["tags"]
repo_dir = r["dir"]
status = r["status"]
print(f"{BOLD}{identifier}{RESET}")
print(f" Provider: {provider}")
print(f" Account: {account}")
print(f" Repository: {repository}")
# Alias value highlighted in red
if alias:
print(f" Alias: {RED}{alias}{RESET}")
status_colored = _color_status(status)
print(f" Status: {status_colored}")
if categories:
print(f" Categories: {', '.join(categories)}")
if tags:
print(f" Tags: {', '.join(tags)}")
print(f" Directory: {repo_dir}")
if homepage:
print(f" Homepage: {homepage}")
if description:
print(f"\n\033[3m{description}\033[0m")
print("\nAttributes:")
# Loop through all attributes.
for key, value in repo.items():
formatted_value = str(value)
# Special formatting for the "verified" attribute (yellow).
if key == "verified" and value:
formatted_value = f"\033[1;33m{value}\033[0m"
# Special formatting for the "ignore" flag (red if True).
if key == "ignore" and value:
formatted_value = f"\033[1;31m{value}\033[0m"
if key == "description":
continue
# Highlight homepage in blue.
if key.lower() == "homepage" and value:
formatted_value = f"\033[1;34m{value}\033[0m"
print(f" {key}: {formatted_value}")
# Always display the computed status.
print(f" Status: {status_str}")
# If the repository is installed, display a hint for more info.
if os.path.exists(executable_path):
print(f"\nMore information and help: \033[1;4mpkgmgr {identifier} --help\033[0m\n")
print("-" * 40)
print(" Description:")
for line in wrap(description, width=78):
print(f" {line}")
print()

View File

@@ -1,30 +1,305 @@
import sys
import yaml
import os
from .get_repo_dir import get_repo_dir
DEFAULT_CONFIG_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../","config", "defaults.yaml")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
def load_config(user_config_path):
"""Load configuration from defaults and merge in user config if present."""
if not os.path.exists(DEFAULT_CONFIG_PATH):
print(f"Default configuration file '{DEFAULT_CONFIG_PATH}' not found.")
sys.exit(5)
with open(DEFAULT_CONFIG_PATH, 'r') as f:
config = yaml.safe_load(f)
if "directories" not in config or "repositories" not in config:
print("Default config file must contain 'directories' and 'repositories' keys.")
sys.exit(6)
if os.path.exists(user_config_path):
with open(user_config_path, 'r') as f:
user_config = yaml.safe_load(f)
if user_config:
if "directories" in user_config:
config["directories"] = user_config["directories"]
if "repositories" in user_config:
config["repositories"].extend(user_config["repositories"])
for repository in config["repositories"]:
# You can overwritte the directory path in the config
if "directory" not in repository:
directory = get_repo_dir(config["directories"]["repositories"], repository)
repository["directory"] = os.path.expanduser(directory)
return config
"""
Load and merge pkgmgr configuration.
Layering rules:
1. Defaults / category files:
- Zuerst werden alle *.yml/*.yaml (außer config.yaml) im
Benutzerverzeichnis geladen:
~/.config/pkgmgr/
- Falls dort keine passenden Dateien existieren, wird auf die im
Paket / Projekt mitgelieferten Config-Verzeichnisse zurückgegriffen:
<pkg_root>/config_defaults
<pkg_root>/config
<project_root>/config_defaults
<project_root>/config
Dabei werden ebenfalls alle *.yml/*.yaml als Layer geladen.
- Der Dateiname ohne Endung (stem) wird als Kategorie-Name
verwendet und in repo["category_files"] eingetragen.
2. User config:
- ~/.config/pkgmgr/config.yaml (oder der übergebene Pfad)
wird geladen und PER LISTEN-MERGE über die Defaults gelegt:
- directories: dict deep-merge
- repositories: per _merge_repo_lists (kein Löschen!)
3. Ergebnis:
- Ein dict mit mindestens:
config["directories"] (dict)
config["repositories"] (list[dict])
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Dict, List, Tuple
import yaml
Repo = Dict[str, Any]
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]:
"""
Recursively merge two dictionaries.
Values from `override` win over values in `base`.
"""
for key, value in override.items():
if (
key in base
and isinstance(base[key], dict)
and isinstance(value, dict)
):
_deep_merge(base[key], value)
else:
base[key] = value
return base
def _repo_key(repo: Repo) -> Tuple[str, str, str]:
"""
Normalised key for identifying a repository across config files.
"""
return (
str(repo.get("provider", "")),
str(repo.get("account", "")),
str(repo.get("repository", "")),
)
def _merge_repo_lists(
base_list: List[Repo],
new_list: List[Repo],
category_name: str | None = None,
) -> List[Repo]:
"""
Merge two repository lists, matching by (provider, account, repository).
- Wenn ein Repo aus new_list noch nicht existiert, wird es hinzugefügt.
- Wenn es existiert, werden seine Felder per Deep-Merge überschrieben.
- Wenn category_name gesetzt ist, wird dieser in
repo["category_files"] eingetragen.
"""
index: Dict[Tuple[str, str, str], Repo] = {
_repo_key(r): r for r in base_list
}
for src in new_list:
key = _repo_key(src)
if key == ("", "", ""):
# Unvollständiger Schlüssel -> einfach anhängen
dst = dict(src)
if category_name:
dst.setdefault("category_files", [])
if category_name not in dst["category_files"]:
dst["category_files"].append(category_name)
base_list.append(dst)
continue
existing = index.get(key)
if existing is None:
dst = dict(src)
if category_name:
dst.setdefault("category_files", [])
if category_name not in dst["category_files"]:
dst["category_files"].append(category_name)
base_list.append(dst)
index[key] = dst
else:
_deep_merge(existing, src)
if category_name:
existing.setdefault("category_files", [])
if category_name not in existing["category_files"]:
existing["category_files"].append(category_name)
return base_list
def _load_yaml_file(path: Path) -> Dict[str, Any]:
"""
Load a single YAML file as dict. Non-dicts yield {}.
"""
if not path.is_file():
return {}
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
return {}
return data
def _load_layer_dir(
config_dir: Path,
skip_filename: str | None = None,
) -> Dict[str, Any]:
"""
Load all *.yml/*.yaml from a directory as layered defaults.
- skip_filename: Dateiname (z.B. "config.yaml"), der ignoriert
werden soll (z.B. User-Config).
Rückgabe:
{
"directories": {...},
"repositories": [...],
}
"""
defaults: Dict[str, Any] = {"directories": {}, "repositories": []}
if not config_dir.is_dir():
return defaults
yaml_files = [
p
for p in config_dir.iterdir()
if p.is_file()
and p.suffix.lower() in (".yml", ".yaml")
and (skip_filename is None or p.name != skip_filename)
]
if not yaml_files:
return defaults
yaml_files.sort(key=lambda p: p.name)
for path in yaml_files:
data = _load_yaml_file(path)
category_name = path.stem # Dateiname ohne .yml/.yaml
dirs = data.get("directories")
if isinstance(dirs, dict):
defaults.setdefault("directories", {})
_deep_merge(defaults["directories"], dirs)
repos = data.get("repositories")
if isinstance(repos, list):
defaults.setdefault("repositories", [])
_merge_repo_lists(
defaults["repositories"],
repos,
category_name=category_name,
)
return defaults
def _load_defaults_from_package_or_project() -> Dict[str, Any]:
"""
Fallback: Versuche Defaults aus dem installierten Paket ODER
aus dem Projekt-Root zu laden:
<pkg_root>/config_defaults
<pkg_root>/config
<project_root>/config_defaults
<project_root>/config
"""
try:
import pkgmgr # type: ignore
except Exception:
return {"directories": {}, "repositories": []}
pkg_root = Path(pkgmgr.__file__).resolve().parent
project_root = pkg_root.parent
candidates = [
pkg_root / "config_defaults",
pkg_root / "config",
project_root / "config_defaults",
project_root / "config",
]
for cand in candidates:
defaults = _load_layer_dir(cand, skip_filename=None)
if defaults["directories"] or defaults["repositories"]:
return defaults
return {"directories": {}, "repositories": []}
# ---------------------------------------------------------------------------
# Hauptfunktion
# ---------------------------------------------------------------------------
def load_config(user_config_path: str) -> Dict[str, Any]:
"""
Load and merge configuration for pkgmgr.
Schritte:
1. Ermittle ~/.config/pkgmgr/ (oder das Verzeichnis von user_config_path).
2. Lade alle *.yml/*.yaml dort (außer der User-Config selbst) als
Defaults / Kategorie-Layer.
3. Wenn dort nichts gefunden wurde, Fallback auf Paket/Projekt.
4. Lade die User-Config-Datei selbst (falls vorhanden).
5. Merge:
- directories: deep-merge (Defaults <- User)
- repositories: _merge_repo_lists (Defaults <- User)
"""
user_config_path_expanded = os.path.expanduser(user_config_path)
user_cfg_path = Path(user_config_path_expanded)
config_dir = user_cfg_path.parent
if not str(config_dir):
# Fallback, falls jemand nur "config.yaml" übergibt
config_dir = Path(os.path.expanduser("~/.config/pkgmgr"))
config_dir.mkdir(parents=True, exist_ok=True)
user_cfg_name = user_cfg_path.name
# 1+2) Defaults / Kategorie-Layer aus dem User-Verzeichnis
defaults = _load_layer_dir(config_dir, skip_filename=user_cfg_name)
# 3) Falls dort nichts gefunden wurde, Fallback auf Paket/Projekt
if not defaults["directories"] and not defaults["repositories"]:
defaults = _load_defaults_from_package_or_project()
defaults.setdefault("directories", {})
defaults.setdefault("repositories", [])
# 4) User-Config
user_cfg: Dict[str, Any] = {}
if user_cfg_path.is_file():
user_cfg = _load_yaml_file(user_cfg_path)
user_cfg.setdefault("directories", {})
user_cfg.setdefault("repositories", [])
# 5) Merge: directories deep-merge, repositories listen-merge
merged: Dict[str, Any] = {}
# directories
merged["directories"] = {}
_deep_merge(merged["directories"], defaults["directories"])
_deep_merge(merged["directories"], user_cfg["directories"])
# repositories
merged["repositories"] = []
_merge_repo_lists(merged["repositories"], defaults["repositories"], category_name=None)
_merge_repo_lists(merged["repositories"], user_cfg["repositories"], category_name=None)
# andere Top-Level-Keys (falls vorhanden)
other_keys = (set(defaults.keys()) | set(user_cfg.keys())) - {
"directories",
"repositories",
}
for key in other_keys:
base_val = defaults.get(key)
override_val = user_cfg.get(key)
if isinstance(base_val, dict) and isinstance(override_val, dict):
merged[key] = _deep_merge(dict(base_val), override_val)
elif override_val is not None:
merged[key] = override_val
else:
merged[key] = base_val
return merged

View File

@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "package-manager"
version = "0.4.2"
version = "0.5.1"
description = "Kevin's package-manager tool (pkgmgr)"
readme = "README.md"
requires-python = ">=3.11"

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
import os
import runpy
import sys
import unittest
from test_integration_version_commands import PROJECT_ROOT
class TestIntegrationListCommands(unittest.TestCase):
"""
Integration tests for `pkgmgr list` with the new selection and
description behaviour.
"""
def _run_pkgmgr(self, args: list[str], cwd: str | None = None) -> None:
cmd_repr = "pkgmgr " + " ".join(args)
original_argv = list(sys.argv)
original_cwd = os.getcwd()
try:
if cwd is not None:
os.chdir(cwd)
# Simulate: pkgmgr <args...>
sys.argv = ["pkgmgr"] + args
try:
runpy.run_module("main", run_name="__main__")
except SystemExit as exc:
code = exc.code if isinstance(exc.code, int) else str(exc.code)
if code != 0:
print()
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Working directory: {os.getcwd()}")
print(f"[TEST] Exit code : {code}")
raise AssertionError(
f"{cmd_repr!r} failed with exit code {code}. "
"Scroll up to inspect the output printed before failure."
) from exc
finally:
os.chdir(original_cwd)
sys.argv = original_argv
def test_list_all_repositories(self) -> None:
"""
`pkgmgr list --all` should successfully print the summary table.
"""
self._run_pkgmgr(["list", "--all"], cwd=PROJECT_ROOT)
def test_list_all_with_description(self) -> None:
"""
`pkgmgr list --all --description` should print the table plus the
detailed section for each repository.
"""
self._run_pkgmgr(["list", "--all", "--description"], cwd=PROJECT_ROOT)
def test_list_with_string_filter(self) -> None:
"""
`pkgmgr list --string pkgmgr` exercises the new string-based
selection logic on top of the defaults + user config.
"""
self._run_pkgmgr(["list", "--string", "pkgmgr"], cwd=PROJECT_ROOT)

View File

@@ -0,0 +1,65 @@
from __future__ import annotations
import os
import runpy
import sys
import unittest
from test_integration_version_commands import PROJECT_ROOT
class TestIntegrationProxyCommands(unittest.TestCase):
"""
Integration tests for proxy commands (e.g. git pull) using the new
selection logic and `--preview` mode so no real changes are made.
"""
def _run_pkgmgr(self, args: list[str], cwd: str | None = None) -> None:
cmd_repr = "pkgmgr " + " ".join(args)
original_argv = list(sys.argv)
original_cwd = os.getcwd()
try:
if cwd is not None:
os.chdir(cwd)
# Simulate: pkgmgr <args...>
sys.argv = ["pkgmgr"] + args
try:
runpy.run_module("main", run_name="__main__")
except SystemExit as exc:
code = exc.code if isinstance(exc.code, int) else str(exc.code)
if code != 0:
print()
print(f"[TEST] Command : {cmd_repr}")
print(f"[TEST] Working directory: {os.getcwd()}")
print(f"[TEST] Exit code : {code}")
raise AssertionError(
f"{cmd_repr!r} failed with exit code {code}. "
"Scroll up to inspect the output printed before failure."
) from exc
finally:
os.chdir(original_cwd)
sys.argv = original_argv
def test_git_pull_preview_for_pkgmgr(self) -> None:
"""
`pkgmgr pull --preview pkgmgr` should go through the proxy layer,
use get_selected_repos() and only print the underlying git pull
command without executing it.
"""
self._run_pkgmgr(
["pull", "--preview", "pkgmgr"],
cwd=PROJECT_ROOT,
)
def test_git_pull_preview_with_string_filter(self) -> None:
"""
`pkgmgr pull --preview --string pkgmgr` exercises the proxy +
filter-only selection path.
"""
self._run_pkgmgr(
["pull", "--preview", "--string", "pkgmgr"],
cwd=PROJECT_ROOT,
)

View File

@@ -4,10 +4,17 @@
"""
End-to-end style integration tests for the `pkgmgr release` CLI command.
These tests exercise the top-level `pkgmgr` entry point by invoking
the module as `__main__` and verifying that the underlying
`pkgmgr.release.release()` function is called with the expected
arguments, in particular the new `close` flag.
These tests exercise the real top-level entry point (main.py) and mock
the high-level helper used by the CLI wiring
(pkgmgr.cli_core.commands.release.run_release) to ensure that argument
parsing and dispatch behave as expected, in particular the new `close`
flag.
The tests simulate real CLI calls like:
pkgmgr release minor --preview --close
by manipulating sys.argv and executing main.py as __main__ via runpy.
"""
from __future__ import annotations
@@ -21,55 +28,110 @@ from unittest.mock import patch
class TestIntegrationReleaseCommand(unittest.TestCase):
"""Integration tests for `pkgmgr release` wiring."""
def _run_pkgmgr(self, argv: list[str]) -> None:
def _run_pkgmgr(self, extra_args: list[str]) -> None:
"""
Helper to invoke the `pkgmgr` console script via `run_module`.
Helper to invoke the `pkgmgr` console script via the real
entry point (main.py).
This simulates a real CLI call like:
pkgmgr release minor --preview --close
pkgmgr <extra_args...>
by setting sys.argv accordingly and executing main.py as
__main__ using runpy.run_module.
"""
original_argv = list(sys.argv)
try:
sys.argv = argv
# Entry point: the `pkgmgr` module is the console script.
runpy.run_module("pkgmgr", run_name="__main__")
# argv[0] is the program name; the rest are CLI arguments.
sys.argv = ["pkgmgr"] + list(extra_args)
runpy.run_module("main", run_name="__main__")
finally:
sys.argv = original_argv
@patch("pkgmgr.release.release")
def test_release_without_close_flag(self, mock_release) -> None:
# ------------------------------------------------------------------
# Behaviour without --close
# ------------------------------------------------------------------
@patch("pkgmgr.cli_core.commands.release.run_release")
@patch("pkgmgr.cli_core.dispatch._select_repo_for_current_directory")
def test_release_without_close_flag(
self,
mock_select_repo,
mock_run_release,
) -> None:
"""
Calling `pkgmgr release patch --preview` should *not* enable
the `close` flag by default.
"""
self._run_pkgmgr(["pkgmgr", "release", "patch", "--preview"])
# Ensure that the dispatch layer always selects a repository,
# independent of any real config in the test environment.
mock_select_repo.return_value = [
{
"directory": ".",
"provider": "local",
"account": "test",
"repository": "dummy",
}
]
mock_release.assert_called_once()
_args, kwargs = mock_release.call_args
self._run_pkgmgr(["release", "patch", "--preview"])
mock_run_release.assert_called_once()
_args, kwargs = mock_run_release.call_args
# CLI wiring
self.assertEqual(kwargs.get("release_type"), "patch")
self.assertTrue(kwargs.get("preview"), "preview should be True when --preview is used")
self.assertTrue(
kwargs.get("preview"),
"preview should be True when --preview is used",
)
# Default: no --close → close=False
self.assertFalse(kwargs.get("close"), "close must be False when --close is not given")
self.assertFalse(
kwargs.get("close"),
"close must be False when --close is not given",
)
@patch("pkgmgr.release.release")
def test_release_with_close_flag(self, mock_release) -> None:
# ------------------------------------------------------------------
# Behaviour with --close
# ------------------------------------------------------------------
@patch("pkgmgr.cli_core.commands.release.run_release")
@patch("pkgmgr.cli_core.dispatch._select_repo_for_current_directory")
def test_release_with_close_flag(
self,
mock_select_repo,
mock_run_release,
) -> None:
"""
Calling `pkgmgr release minor --preview --close` should pass
close=True into pkgmgr.release.release().
close=True into the helper used by the CLI wiring.
"""
self._run_pkgmgr(["pkgmgr", "release", "minor", "--preview", "--close"])
# Again: make sure there is always a selected repository.
mock_select_repo.return_value = [
{
"directory": ".",
"provider": "local",
"account": "test",
"repository": "dummy",
}
]
mock_release.assert_called_once()
_args, kwargs = mock_release.call_args
self._run_pkgmgr(["release", "minor", "--preview", "--close"])
mock_run_release.assert_called_once()
_args, kwargs = mock_run_release.call_args
# CLI wiring
self.assertEqual(kwargs.get("release_type"), "minor")
self.assertTrue(kwargs.get("preview"), "preview should be True when --preview is used")
self.assertTrue(
kwargs.get("preview"),
"preview should be True when --preview is used",
)
# With --close → close=True
self.assertTrue(kwargs.get("close"), "close must be True when --close is given")
self.assertTrue(
kwargs.get("close"),
"close must be True when --close is given",
)
if __name__ == "__main__":

View File

@@ -0,0 +1,112 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Unit tests for the `pkgmgr branch` CLI wiring.
These tests verify that:
- The argument parser creates the correct structure for
`branch open` and `branch close`.
- `handle_branch` calls the corresponding helper functions
with the expected arguments (including base branch and cwd).
"""
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.cli_core.parser import create_parser
from pkgmgr.cli_core.commands.branch import handle_branch
class TestBranchCLI(unittest.TestCase):
"""
Tests for the branch subcommands implemented in cli_core.
"""
def _create_parser(self):
"""
Create the top-level parser with a minimal description.
"""
return create_parser("pkgmgr test parser")
@patch("pkgmgr.cli_core.commands.branch.open_branch")
def test_branch_open_with_name_and_base(self, mock_open_branch):
"""
Ensure that `pkgmgr branch open <name> --base <branch>` calls
open_branch() with the correct parameters.
"""
parser = self._create_parser()
args = parser.parse_args(
["branch", "open", "feature/test-branch", "--base", "develop"]
)
# Sanity check: parser wiring
self.assertEqual(args.command, "branch")
self.assertEqual(args.subcommand, "open")
self.assertEqual(args.name, "feature/test-branch")
self.assertEqual(args.base, "develop")
# ctx is currently unused by handle_branch, so we can pass None
handle_branch(args, ctx=None)
mock_open_branch.assert_called_once()
_args, kwargs = mock_open_branch.call_args
self.assertEqual(kwargs.get("name"), "feature/test-branch")
self.assertEqual(kwargs.get("base_branch"), "develop")
self.assertEqual(kwargs.get("cwd"), ".")
@patch("pkgmgr.cli_core.commands.branch.close_branch")
def test_branch_close_with_name_and_base(self, mock_close_branch):
"""
Ensure that `pkgmgr branch close <name> --base <branch>` calls
close_branch() with the correct parameters.
"""
parser = self._create_parser()
args = parser.parse_args(
["branch", "close", "feature/old-branch", "--base", "main"]
)
# Sanity check: parser wiring
self.assertEqual(args.command, "branch")
self.assertEqual(args.subcommand, "close")
self.assertEqual(args.name, "feature/old-branch")
self.assertEqual(args.base, "main")
handle_branch(args, ctx=None)
mock_close_branch.assert_called_once()
_args, kwargs = mock_close_branch.call_args
self.assertEqual(kwargs.get("name"), "feature/old-branch")
self.assertEqual(kwargs.get("base_branch"), "main")
self.assertEqual(kwargs.get("cwd"), ".")
@patch("pkgmgr.cli_core.commands.branch.close_branch")
def test_branch_close_without_name_uses_none(self, mock_close_branch):
"""
Ensure that `pkgmgr branch close` without a name passes name=None
into close_branch(), leaving branch resolution to the helper.
"""
parser = self._create_parser()
args = parser.parse_args(["branch", "close"])
# Parser wiring: no name → None
self.assertEqual(args.command, "branch")
self.assertEqual(args.subcommand, "close")
self.assertIsNone(args.name)
handle_branch(args, ctx=None)
mock_close_branch.assert_called_once()
_args, kwargs = mock_close_branch.call_args
self.assertIsNone(kwargs.get("name"))
self.assertEqual(kwargs.get("base_branch"), "main")
self.assertEqual(kwargs.get("cwd"), ".")
if __name__ == "__main__":
unittest.main()

View File

@@ -42,7 +42,7 @@ def _fake_config() -> Dict[str, Any]:
"workspaces": "/tmp/pkgmgr-workspaces",
},
# The actual list of repositories is not used directly by the tests,
# because we mock get_selected_repos(). It must exist, though.
# because we mock the selection logic. It must exist, though.
"repositories": [],
}
@@ -54,8 +54,9 @@ class TestCliVersion(unittest.TestCase):
Each test:
- Runs in a temporary working directory.
- Uses a fake configuration via load_config().
- Uses a mocked get_selected_repos() that returns a single repo
pointing to the temporary directory.
- Uses the same selection logic as the new CLI:
* dispatch_command() calls _select_repo_for_current_directory()
when there is no explicit selection.
"""
def setUp(self) -> None:
@@ -64,35 +65,30 @@ class TestCliVersion(unittest.TestCase):
self._old_cwd = os.getcwd()
os.chdir(self._tmp_dir.name)
# Define a fake repo pointing to our temp dir
self._fake_repo = {
"provider": "github.com",
"account": "test",
"repository": "pkgmgr-test",
"directory": self._tmp_dir.name,
}
# Patch load_config so cli.main() does not read real config files
self._patch_load_config = mock.patch(
"pkgmgr.cli.load_config", return_value=_fake_config()
)
self.mock_load_config = self._patch_load_config.start()
# Patch get_selected_repos so that 'version' operates on our temp dir.
# In the new modular CLI this function is used inside
# pkgmgr.cli_core.dispatch, so we patch it there.
def _fake_selected_repos(
all_flag: bool,
repos: List[dict],
identifiers: List[str],
):
# We always return exactly one "repository" whose directory is the temp dir.
return [
{
"provider": "github.com",
"account": "test",
"repository": "pkgmgr-test",
"directory": self._tmp_dir.name,
}
]
self._patch_get_selected_repos = mock.patch(
"pkgmgr.cli_core.dispatch.get_selected_repos",
side_effect=_fake_selected_repos,
# Patch the "current directory" selection used by dispatch_command().
# This matches the new behaviour: without explicit identifiers,
# version uses _select_repo_for_current_directory(ctx).
self._patch_select_repo_for_current_directory = mock.patch(
"pkgmgr.cli_core.dispatch._select_repo_for_current_directory",
return_value=[self._fake_repo],
)
self.mock_select_repo_for_current_directory = (
self._patch_select_repo_for_current_directory.start()
)
self.mock_get_selected_repos = self._patch_get_selected_repos.start()
# Keep a reference to the original sys.argv, so we can restore it
self._old_argv = list(sys.argv)
@@ -102,7 +98,7 @@ class TestCliVersion(unittest.TestCase):
sys.argv = self._old_argv
# Stop all patches
self._patch_get_selected_repos.stop()
self._patch_select_repo_for_current_directory.stop()
self._patch_load_config.stop()
# Restore working directory
@@ -228,7 +224,7 @@ class TestCliVersion(unittest.TestCase):
# Arrange: pyproject.toml exists
self._write_pyproject("0.0.1")
# Arrange: no tags returned (again: patch handle_version's get_tags)
# Arrange: no tags returned
with mock.patch(
"pkgmgr.cli_core.commands.version.get_tags",
return_value=[],