diff --git a/CHANGELOG.md b/CHANGELOG.md index ebd860b..a128505 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## [0.4.3] - 2025-12-09 + +* Implement current-directory repository selection for release and proxy commands, unify selection semantics across CLI layers, extend release workflow with --close, integrate branch closing logic, fix wiring for get_repo_identifier/get_repo_dir, update packaging files (PKGBUILD, spec, flake.nix, pyproject), and add comprehensive unit/e2e tests for release and branch commands (see ChatGPT conversation: https://chatgpt.com/share/69375cfe-9e00-800f-bd65-1bd5937e1696) + + ## [0.4.2] - 2025-12-09 * Wire pkgmgr release CLI to new helper and add unit tests (see ChatGPT conversation: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62) @@ -12,6 +17,17 @@ * Add branch closing helper and --close flag to release command, including CLI wiring and tests (see https://chatgpt.com/share/69374aec-74ec-800f-bde3-5d91dfdb9b91) +## [0.3.0] - 2025-12-08 + +* Massive refactor and feature expansion: +- Complete rewrite of config loading system (layered defaults + user config) +- New selection engine (--string, --category, --tag) +- Overhauled list output (colored statuses, alias highlight) +- New config update logic + default YAML sync +- Improved proxy command handling +- Full CLI routing refactor +- Expanded E2E tests for list, proxy, and selection logic +Konversation: https://chatgpt.com/share/693745c3-b8d8-800f-aa29-c8481a2ffae1 ## [0.2.0] - 2025-12-08 diff --git a/PKGBUILD b/PKGBUILD index 8fb1b10..43c7a7f 100644 --- a/PKGBUILD +++ b/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Kevin Veen-Birkenbach pkgname=package-manager -pkgver=0.4.2 +pkgver=0.4.3 pkgrel=1 pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)." arch=('any') diff --git a/config/wip.yml b/config/wip.yml new file mode 100644 index 0000000..abbf8f1 --- /dev/null +++ b/config/wip.yml @@ -0,0 +1,7 @@ +- account: kevinveenbirkenbach + alias: gkfdrtdtcntr + provider: github.com + repository: federated-to-central-social-network-bridge + verified: + gpg_keys: + - 44D8F11FD62F878E \ No newline at end of file diff --git a/debian/changelog b/debian/changelog index 20e1e28..b926f13 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +package-manager (0.4.3-1) unstable; urgency=medium + + * Implement current-directory repository selection for release and proxy commands, unify selection semantics across CLI layers, extend release workflow with --close, integrate branch closing logic, fix wiring for get_repo_identifier/get_repo_dir, update packaging files (PKGBUILD, spec, flake.nix, pyproject), and add comprehensive unit/e2e tests for release and branch commands (see ChatGPT conversation: https://chatgpt.com/share/69375cfe-9e00-800f-bd65-1bd5937e1696) + + -- Kevin Veen-Birkenbach Tue, 09 Dec 2025 00:29:08 +0100 + package-manager (0.4.2-1) unstable; urgency=medium * Wire pkgmgr release CLI to new helper and add unit tests (see ChatGPT conversation: https://chatgpt.com/share/69374f09-c760-800f-92e4-5b44a4510b62) @@ -16,6 +22,20 @@ package-manager (0.4.0-1) unstable; urgency=medium -- Kevin Veen-Birkenbach Mon, 08 Dec 2025 23:02:43 +0100 +package-manager (0.3.0-1) unstable; urgency=medium + + * Massive refactor and feature expansion: +- Complete rewrite of config loading system (layered defaults + user config) +- New selection engine (--string, --category, --tag) +- Overhauled list output (colored statuses, alias highlight) +- New config update logic + default YAML sync +- Improved proxy command handling +- Full CLI routing refactor +- Expanded E2E tests for list, proxy, and selection logic +Konversation: https://chatgpt.com/share/693745c3-b8d8-800f-aa29-c8481a2ffae1 + + -- Kevin Veen-Birkenbach Mon, 08 Dec 2025 22:40:49 +0100 + package-manager (0.2.0-1) unstable; urgency=medium * Add preview-first release workflow and extended packaging support (see ChatGPT conversation: https://chatgpt.com/share/693722b4-af9c-800f-bccc-8a4036e99630) diff --git a/flake.nix b/flake.nix index b49907b..75510bf 100644 --- a/flake.nix +++ b/flake.nix @@ -31,7 +31,7 @@ rec { pkgmgr = pyPkgs.buildPythonApplication { pname = "package-manager"; - version = "0.4.2"; + version = "0.4.3"; # Use the git repo as source src = ./.; diff --git a/package-manager.spec b/package-manager.spec index 6446066..fd3db8b 100644 --- a/package-manager.spec +++ b/package-manager.spec @@ -1,5 +1,5 @@ Name: package-manager -Version: 0.4.2 +Version: 0.4.3 Release: 1%{?dist} Summary: Wrapper that runs Kevin's package-manager via Nix flake diff --git a/pkgmgr/cli.py b/pkgmgr/cli.py index 1f96453..278728c 100755 --- a/pkgmgr/cli.py +++ b/pkgmgr/cli.py @@ -9,9 +9,9 @@ import sys from pkgmgr.load_config import load_config from pkgmgr.cli_core import CLIContext, create_parser, dispatch_command -# Define configuration file paths. -PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) -USER_CONFIG_PATH = os.path.join(PROJECT_ROOT, "config", "config.yaml") +# User config lives in the home directory: +# ~/.config/pkgmgr/config.yaml +USER_CONFIG_PATH = os.path.expanduser("~/.config/pkgmgr/config.yaml") DESCRIPTION_TEXT = """\ \033[1;32mPackage Manager 🤖📦\033[0m @@ -63,20 +63,31 @@ For detailed help on each command, use: def main() -> None: - # Load merged configuration + """ + Entry point for the pkgmgr CLI. + """ + config_merged = load_config(USER_CONFIG_PATH) - repositories_base_dir = os.path.expanduser( - config_merged["directories"]["repositories"] + # Directories: be robust and provide sane defaults if missing + directories = config_merged.get("directories") or {} + repositories_dir = os.path.expanduser( + directories.get("repositories", "~/Repositories") ) binaries_dir = os.path.expanduser( - config_merged["directories"]["binaries"] + directories.get("binaries", "~/.local/bin") ) - all_repositories = config_merged["repositories"] + + # Ensure the merged config actually contains the resolved directories + config_merged.setdefault("directories", {}) + config_merged["directories"]["repositories"] = repositories_dir + config_merged["directories"]["binaries"] = binaries_dir + + all_repositories = config_merged.get("repositories", []) ctx = CLIContext( config_merged=config_merged, - repositories_base_dir=repositories_base_dir, + repositories_base_dir=repositories_dir, all_repositories=all_repositories, binaries_dir=binaries_dir, user_config_path=USER_CONFIG_PATH, @@ -85,7 +96,6 @@ def main() -> None: parser = create_parser(DESCRIPTION_TEXT) args = parser.parse_args() - # If no subcommand is provided, show help if not getattr(args, "command", None): parser.print_help() return diff --git a/pkgmgr/cli_core/commands/config.py b/pkgmgr/cli_core/commands/config.py index 6610aff..478ee03 100644 --- a/pkgmgr/cli_core/commands/config.py +++ b/pkgmgr/cli_core/commands/config.py @@ -1,8 +1,13 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + from __future__ import annotations import os import sys -from typing import Any, Dict, List +import shutil +from pathlib import Path +from typing import Any, Dict import yaml @@ -17,29 +22,103 @@ from pkgmgr.run_command import run_command def _load_user_config(user_config_path: str) -> Dict[str, Any]: """ - Load the user config file, returning a default structure if it does not exist. + Load the user config from ~/.config/pkgmgr/config.yaml + (or whatever ctx.user_config_path is), creating the directory if needed. """ - if os.path.exists(user_config_path): - with open(user_config_path, "r") as f: + user_config_path_expanded = os.path.expanduser(user_config_path) + cfg_dir = os.path.dirname(user_config_path_expanded) + if cfg_dir and not os.path.isdir(cfg_dir): + os.makedirs(cfg_dir, exist_ok=True) + + if os.path.exists(user_config_path_expanded): + with open(user_config_path_expanded, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {"repositories": []} return {"repositories": []} +def _find_defaults_source_dir() -> str | None: + """ + Find the directory inside the installed pkgmgr package OR the + project root that contains default config files. + + Preferred locations (in dieser Reihenfolge): + - /config_defaults + - /config + - /config_defaults + - /config + """ + import pkgmgr # local import to avoid circular deps + + pkg_root = Path(pkgmgr.__file__).resolve().parent + project_root = pkg_root.parent + + candidates = [ + pkg_root / "config_defaults", + pkg_root / "config", + project_root / "config_defaults", + project_root / "config", + ] + for cand in candidates: + if cand.is_dir(): + return str(cand) + return None + + +def _update_default_configs(user_config_path: str) -> None: + """ + Copy all default *.yml/*.yaml files from the installed pkgmgr package + into ~/.config/pkgmgr/, overwriting existing ones – except the user + config file itself (config.yaml), which is never touched. + """ + source_dir = _find_defaults_source_dir() + if not source_dir: + print( + "[WARN] No config_defaults or config directory found in " + "pkgmgr installation. Nothing to update." + ) + return + + dest_dir = os.path.dirname(os.path.expanduser(user_config_path)) + if not dest_dir: + dest_dir = os.path.expanduser("~/.config/pkgmgr") + os.makedirs(dest_dir, exist_ok=True) + + for name in os.listdir(source_dir): + lower = name.lower() + if not (lower.endswith(".yml") or lower.endswith(".yaml")): + continue + if name == "config.yaml": + # Never overwrite the user config template / live config + continue + + src = os.path.join(source_dir, name) + dst = os.path.join(dest_dir, name) + + shutil.copy2(src, dst) + print(f"[INFO] Updated default config file: {dst}") + + def handle_config(args, ctx: CLIContext) -> None: """ - Handle the 'config' command and its subcommands. + Handle 'pkgmgr config' subcommands. """ user_config_path = ctx.user_config_path - # -------------------------------------------------------- + # ------------------------------------------------------------ # config show - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.subcommand == "show": if args.all or (not args.identifiers): + # Full merged config view show_config([], user_config_path, full_config=True) else: - selected = resolve_repos(args.identifiers, ctx.all_repositories) + # Show only matching entries from user config + user_config = _load_user_config(user_config_path) + selected = resolve_repos( + args.identifiers, + user_config.get("repositories", []), + ) if selected: show_config( selected, @@ -48,23 +127,23 @@ def handle_config(args, ctx: CLIContext) -> None: ) return - # -------------------------------------------------------- + # ------------------------------------------------------------ # config add - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.subcommand == "add": interactive_add(ctx.config_merged, user_config_path) return - # -------------------------------------------------------- + # ------------------------------------------------------------ # config edit - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.subcommand == "edit": run_command(f"nano {user_config_path}") return - # -------------------------------------------------------- + # ------------------------------------------------------------ # config init - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.subcommand == "init": user_config = _load_user_config(user_config_path) config_init( @@ -75,14 +154,17 @@ def handle_config(args, ctx: CLIContext) -> None: ) return - # -------------------------------------------------------- + # ------------------------------------------------------------ # config delete - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.subcommand == "delete": user_config = _load_user_config(user_config_path) if args.all or not args.identifiers: - print("You must specify identifiers to delete.") + print( + "[ERROR] 'config delete' requires explicit identifiers. " + "Use 'config show' to inspect entries." + ) return to_delete = resolve_repos( @@ -99,14 +181,17 @@ def handle_config(args, ctx: CLIContext) -> None: print(f"Deleted {len(to_delete)} entries from user config.") return - # -------------------------------------------------------- + # ------------------------------------------------------------ # config ignore - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.subcommand == "ignore": user_config = _load_user_config(user_config_path) if args.all or not args.identifiers: - print("You must specify identifiers to modify ignore flag.") + print( + "[ERROR] 'config ignore' requires explicit identifiers. " + "Use 'config show' to inspect entries." + ) return to_modify = resolve_repos( @@ -135,6 +220,21 @@ def handle_config(args, ctx: CLIContext) -> None: save_user_config(user_config, user_config_path) return - # If we end up here, something is wrong with subcommand routing + # ------------------------------------------------------------ + # config update + # ------------------------------------------------------------ + if args.subcommand == "update": + """ + Copy default YAML configs from the installed package into the + user's ~/.config/pkgmgr directory. + + This will overwrite files with the same name (except config.yaml). + """ + _update_default_configs(user_config_path) + return + + # ------------------------------------------------------------ + # Unknown subcommand + # ------------------------------------------------------------ print(f"Unknown config subcommand: {args.subcommand}") sys.exit(2) diff --git a/pkgmgr/cli_core/commands/repos.py b/pkgmgr/cli_core/commands/repos.py index 08795b4..3944293 100644 --- a/pkgmgr/cli_core/commands/repos.py +++ b/pkgmgr/cli_core/commands/repos.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + from __future__ import annotations import sys @@ -12,7 +15,7 @@ from pkgmgr.status_repos import status_repos from pkgmgr.list_repositories import list_repositories from pkgmgr.run_command import run_command from pkgmgr.create_repo import create_repo - +from pkgmgr.get_selected_repos import get_selected_repos Repository = Dict[str, Any] @@ -23,15 +26,12 @@ def handle_repos_command( selected: List[Repository], ) -> None: """ - Handle repository-related commands: - - install / update / deinstall / delete / status - - path / shell - - create / list + Handle core repository commands (install/update/deinstall/delete/.../list). """ - # -------------------------------------------------------- - # install / update - # -------------------------------------------------------- + # ------------------------------------------------------------ + # install + # ------------------------------------------------------------ if args.command == "install": install_repos( selected, @@ -46,6 +46,9 @@ def handle_repos_command( ) return + # ------------------------------------------------------------ + # update + # ------------------------------------------------------------ if args.command == "update": update_repos( selected, @@ -61,9 +64,9 @@ def handle_repos_command( ) return - # -------------------------------------------------------- - # deinstall / delete - # -------------------------------------------------------- + # ------------------------------------------------------------ + # deinstall + # ------------------------------------------------------------ if args.command == "deinstall": deinstall_repos( selected, @@ -74,6 +77,9 @@ def handle_repos_command( ) return + # ------------------------------------------------------------ + # delete + # ------------------------------------------------------------ if args.command == "delete": delete_repos( selected, @@ -83,9 +89,9 @@ def handle_repos_command( ) return - # -------------------------------------------------------- + # ------------------------------------------------------------ # status - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.command == "status": status_repos( selected, @@ -98,20 +104,20 @@ def handle_repos_command( ) return - # -------------------------------------------------------- + # ------------------------------------------------------------ # path - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.command == "path": for repository in selected: print(repository["directory"]) return - # -------------------------------------------------------- + # ------------------------------------------------------------ # shell - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.command == "shell": if not args.shell_command: - print("No shell command specified.") + print("[ERROR] 'shell' requires a command via -c/--command.") sys.exit(2) command_to_run = " ".join(args.shell_command) for repository in selected: @@ -125,13 +131,13 @@ def handle_repos_command( ) return - # -------------------------------------------------------- + # ------------------------------------------------------------ # create - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.command == "create": if not args.identifiers: print( - "No identifiers provided. Please specify at least one identifier " + "[ERROR] 'create' requires at least one identifier " "in the format provider/account/repository." ) sys.exit(1) @@ -147,15 +153,19 @@ def handle_repos_command( ) return - # -------------------------------------------------------- + # ------------------------------------------------------------ # list - # -------------------------------------------------------- + # ------------------------------------------------------------ if args.command == "list": list_repositories( - ctx.all_repositories, + selected, ctx.repositories_base_dir, ctx.binaries_dir, - search_filter=args.search, - status_filter=args.status, + status_filter=getattr(args, "status", "") or "", + extra_tags=getattr(args, "tag", []) or [], + show_description=getattr(args, "description", False), ) return + + print(f"[ERROR] Unknown repos command: {args.command}") + sys.exit(2) diff --git a/pkgmgr/cli_core/dispatch.py b/pkgmgr/cli_core/dispatch.py index 690982a..dd4f14a 100644 --- a/pkgmgr/cli_core/dispatch.py +++ b/pkgmgr/cli_core/dispatch.py @@ -1,11 +1,16 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + from __future__ import annotations +import os import sys -from typing import List +from typing import List, Dict, Any from pkgmgr.cli_core.context import CLIContext from pkgmgr.cli_core.proxy import maybe_handle_proxy from pkgmgr.get_selected_repos import get_selected_repos +from pkgmgr.get_repo_dir import get_repo_dir from pkgmgr.cli_core.commands import ( handle_repos_command, @@ -19,20 +24,73 @@ from pkgmgr.cli_core.commands import ( ) +def _has_explicit_selection(args) -> bool: + """ + Return True if the user explicitly selected repositories via + identifiers / --all / --category / --tag / --string. + """ + identifiers = getattr(args, "identifiers", []) or [] + use_all = getattr(args, "all", False) + categories = getattr(args, "category", []) or [] + tags = getattr(args, "tag", []) or [] + string_filter = getattr(args, "string", "") or "" + + return bool( + use_all + or identifiers + or categories + or tags + or string_filter + ) + + +def _select_repo_for_current_directory( + ctx: CLIContext, +) -> List[Dict[str, Any]]: + """ + Heuristic: find the repository whose local directory matches the + current working directory or is the closest parent. + + Example: + - Repo directory: /home/kevin/Repositories/foo + - CWD: /home/kevin/Repositories/foo/subdir + → 'foo' is selected. + """ + cwd = os.path.abspath(os.getcwd()) + candidates: List[tuple[str, Dict[str, Any]]] = [] + + for repo in ctx.all_repositories: + repo_dir = repo.get("directory") + if not repo_dir: + try: + repo_dir = get_repo_dir(ctx.repositories_base_dir, repo) + except Exception: + repo_dir = None + if not repo_dir: + continue + + repo_dir_abs = os.path.abspath(os.path.expanduser(repo_dir)) + if cwd == repo_dir_abs or cwd.startswith(repo_dir_abs + os.sep): + candidates.append((repo_dir_abs, repo)) + + if not candidates: + return [] + + # Pick the repo with the longest (most specific) path. + candidates.sort(key=lambda item: len(item[0]), reverse=True) + return [candidates[0][1]] + + def dispatch_command(args, ctx: CLIContext) -> None: """ - Top-level command dispatcher. - - Responsible for: - - computing selected repositories (where applicable) - - delegating to the correct command handler module + Dispatch the parsed arguments to the appropriate command handler. """ - # 1) Proxy commands (git, docker, docker compose) short-circuit. + # First: proxy commands (git / docker / docker compose / make wrapper etc.) if maybe_handle_proxy(args, ctx): return - # 2) Determine if this command uses repository selection. + # Commands that operate on repository selections commands_with_selection: List[str] = [ "install", "update", @@ -41,26 +99,33 @@ def dispatch_command(args, ctx: CLIContext) -> None: "status", "path", "shell", - "code", - "explore", - "terminal", + "create", + "list", + "make", "release", "version", - "make", "changelog", - # intentionally NOT "branch" – it operates on cwd only + "explore", + "terminal", + "code", ] - if args.command in commands_with_selection: - selected = get_selected_repos( - getattr(args, "all", False), - ctx.all_repositories, - getattr(args, "identifiers", []), - ) + if getattr(args, "command", None) in commands_with_selection: + if _has_explicit_selection(args): + # Classic selection logic (identifiers / --all / filters) + selected = get_selected_repos(args, ctx.all_repositories) + else: + # Default per help text: repository of current folder. + selected = _select_repo_for_current_directory(ctx) + # If none is found, leave 'selected' empty. + # Individual handlers will then emit a clear message instead + # of silently picking an unrelated repository. else: selected = [] - # 3) Delegate based on command. + # ------------------------------------------------------------------ # + # Repos-related commands + # ------------------------------------------------------------------ # if args.command in ( "install", "update", @@ -73,22 +138,41 @@ def dispatch_command(args, ctx: CLIContext) -> None: "list", ): handle_repos_command(args, ctx, selected) - elif args.command in ("code", "explore", "terminal"): + return + + # ------------------------------------------------------------------ # + # Tools (explore / terminal / code) + # ------------------------------------------------------------------ # + if args.command in ("explore", "terminal", "code"): handle_tools_command(args, ctx, selected) - elif args.command == "release": + return + + # ------------------------------------------------------------------ # + # Release / Version / Changelog / Config / Make / Branch + # ------------------------------------------------------------------ # + if args.command == "release": handle_release(args, ctx, selected) - elif args.command == "version": + return + + if args.command == "version": handle_version(args, ctx, selected) - elif args.command == "changelog": + return + + if args.command == "changelog": handle_changelog(args, ctx, selected) - elif args.command == "config": + return + + if args.command == "config": handle_config(args, ctx) - elif args.command == "make": + return + + if args.command == "make": handle_make(args, ctx, selected) - elif args.command == "branch": - # Branch commands currently operate on the current working - # directory only, not on the pkgmgr repository selection. + return + + if args.command == "branch": handle_branch(args, ctx) - else: - print(f"Unknown command: {args.command}") - sys.exit(2) + return + + print(f"Unknown command: {args.command}") + sys.exit(2) diff --git a/pkgmgr/cli_core/parser.py b/pkgmgr/cli_core/parser.py index 45716ab..4c57032 100644 --- a/pkgmgr/cli_core/parser.py +++ b/pkgmgr/cli_core/parser.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + from __future__ import annotations import argparse @@ -12,13 +15,20 @@ class SortedSubParsersAction(argparse._SubParsersAction): def add_parser(self, name, **kwargs): parser = super().add_parser(name, **kwargs) + # Sort choices alphabetically by dest (subcommand name) self._choices_actions.sort(key=lambda a: a.dest) return parser def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None: """ - Attach generic repository selection arguments to a subparser. + Common identifier / selection arguments for many subcommands. + + Selection modes (mutual intent, not hard-enforced): + - identifiers (positional): select by alias / provider/account/repo + - --all: select all repositories + - --category / --string / --tag: filter-based selection on top + of the full repository set """ subparser.add_argument( "identifiers", @@ -39,6 +49,33 @@ def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None: "yes | pkgmgr {subcommand} --all" ), ) + subparser.add_argument( + "--category", + nargs="+", + default=[], + help=( + "Filter repositories by category patterns derived from config " + "filenames or repo metadata (use filename without .yml/.yaml, " + "or /regex/ to use a regular expression)." + ), + ) + subparser.add_argument( + "--string", + default="", + help=( + "Filter repositories whose identifier / name / path contains this " + "substring (case-insensitive). Use /regex/ for regular expressions." + ), + ) + subparser.add_argument( + "--tag", + action="append", + default=[], + help=( + "Filter repositories by tag. Matches tags from the repository " + "collector and category tags. Use /regex/ for regular expressions." + ), + ) subparser.add_argument( "--preview", action="store_true", @@ -61,7 +98,7 @@ def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None: def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None: """ - Attach shared flags for install/update-like commands. + Common arguments for install/update commands. """ add_identifier_arguments(subparser) subparser.add_argument( @@ -94,10 +131,7 @@ def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None: def create_parser(description_text: str) -> argparse.ArgumentParser: """ - Create and configure the top-level argument parser for pkgmgr. - - This function defines *only* the CLI surface (arguments & subcommands), - but no business logic. + Create the top-level argument parser for pkgmgr. """ parser = argparse.ArgumentParser( description=description_text, @@ -110,7 +144,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser: ) # ------------------------------------------------------------ - # install / update + # install / update / deinstall / delete # ------------------------------------------------------------ install_parser = subparsers.add_parser( "install", @@ -129,9 +163,6 @@ def create_parser(description_text: str) -> argparse.ArgumentParser: help="Include system update commands", ) - # ------------------------------------------------------------ - # deinstall / delete - # ------------------------------------------------------------ deinstall_parser = subparsers.add_parser( "deinstall", help="Remove alias links to repository/repositories", @@ -147,7 +178,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser: # ------------------------------------------------------------ # create # ------------------------------------------------------------ - create_parser = subparsers.add_parser( + create_cmd_parser = subparsers.add_parser( "create", help=( "Create new repository entries: add them to the config if not " @@ -155,8 +186,8 @@ def create_parser(description_text: str) -> argparse.ArgumentParser: "remotely if --remote is set." ), ) - add_identifier_arguments(create_parser) - create_parser.add_argument( + add_identifier_arguments(create_cmd_parser) + create_cmd_parser.add_argument( "--remote", action="store_true", help="If set, add the remote and push the initial commit.", @@ -228,6 +259,14 @@ def create_parser(description_text: str) -> argparse.ArgumentParser: help="Set ignore to true or false", ) + config_subparsers.add_parser( + "update", + help=( + "Update default config files in ~/.config/pkgmgr/ from the " + "installed pkgmgr package (does not touch config.yaml)." + ), + ) + # ------------------------------------------------------------ # path / explore / terminal / code / shell # ------------------------------------------------------------ @@ -265,7 +304,10 @@ def create_parser(description_text: str) -> argparse.ArgumentParser: "--command", nargs=argparse.REMAINDER, dest="shell_command", - help="The shell command (and its arguments) to execute in each repository", + help=( + "The shell command (and its arguments) to execute in each " + "repository" + ), default=[], ) @@ -289,7 +331,10 @@ def create_parser(description_text: str) -> argparse.ArgumentParser: branch_open.add_argument( "name", nargs="?", - help="Name of the new branch (optional; will be asked interactively if omitted)", + help=( + "Name of the new branch (optional; will be asked interactively " + "if omitted)" + ), ) branch_open.add_argument( "--base", @@ -349,7 +394,8 @@ def create_parser(description_text: str) -> argparse.ArgumentParser: "version", help=( "Show version information for repository/ies " - "(git tags, pyproject.toml, flake.nix, PKGBUILD, debian, spec, Ansible Galaxy)." + "(git tags, pyproject.toml, flake.nix, PKGBUILD, debian, spec, " + "Ansible Galaxy)." ), ) add_identifier_arguments(version_parser) @@ -383,20 +429,29 @@ def create_parser(description_text: str) -> argparse.ArgumentParser: "list", help="List all repositories with details and status", ) - list_parser.add_argument( - "--search", - default="", - help="Filter repositories that contain the given string", - ) + # dieselbe Selektionslogik wie bei install/update/etc.: + add_identifier_arguments(list_parser) list_parser.add_argument( "--status", type=str, default="", - help="Filter repositories by status (case insensitive)", + help=( + "Filter repositories by status (case insensitive). " + "Use /regex/ for regular expressions." + ), + ) + list_parser.add_argument( + "--description", + action="store_true", + help=( + "Show an additional detailed section per repository " + "(description, homepage, tags, categories, paths)." + ), ) + # ------------------------------------------------------------ - # make (wrapper around make in repositories) + # make # ------------------------------------------------------------ make_parser = subparsers.add_parser( "make", @@ -422,7 +477,7 @@ def create_parser(description_text: str) -> argparse.ArgumentParser: add_identifier_arguments(make_deinstall) # ------------------------------------------------------------ - # Proxy commands (git, docker, docker compose) + # Proxy commands (git, docker, docker compose, ...) # ------------------------------------------------------------ register_proxy_commands(subparsers) diff --git a/pkgmgr/cli_core/proxy.py b/pkgmgr/cli_core/proxy.py index 3137dcb..1b7f709 100644 --- a/pkgmgr/cli_core/proxy.py +++ b/pkgmgr/cli_core/proxy.py @@ -1,14 +1,19 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + from __future__ import annotations import argparse +import os import sys -from typing import Dict, List +from typing import Dict, List, Any from pkgmgr.cli_core.context import CLIContext from pkgmgr.clone_repos import clone_repos from pkgmgr.exec_proxy_command import exec_proxy_command -from pkgmgr.get_selected_repos import get_selected_repos from pkgmgr.pull_with_verification import pull_with_verification +from pkgmgr.get_selected_repos import get_selected_repos +from pkgmgr.get_repo_dir import get_repo_dir PROXY_COMMANDS: Dict[str, List[str]] = { @@ -42,10 +47,7 @@ PROXY_COMMANDS: Dict[str, List[str]] = { def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None: """ - Local copy of the identifier argument set for proxy commands. - - This duplicates the semantics of cli.parser.add_identifier_arguments - to avoid circular imports. + Selection arguments for proxy subcommands. """ parser.add_argument( "identifiers", @@ -66,6 +68,24 @@ def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None: "yes | pkgmgr {subcommand} --all" ), ) + parser.add_argument( + "--category", + nargs="+", + default=[], + help=( + "Filter repositories by category patterns derived from config " + "filenames or repo metadata (use filename without .yml/.yaml, " + "or /regex/ to use a regular expression)." + ), + ) + parser.add_argument( + "--string", + default="", + help=( + "Filter repositories whose identifier / name / path contains this " + "substring (case-insensitive). Use /regex/ for regular expressions." + ), + ) parser.add_argument( "--preview", action="store_true", @@ -86,12 +106,62 @@ def _add_proxy_identifier_arguments(parser: argparse.ArgumentParser) -> None: ) +def _proxy_has_explicit_selection(args: argparse.Namespace) -> bool: + """ + Same semantics as in the main dispatch: + True if the user explicitly selected repositories. + """ + identifiers = getattr(args, "identifiers", []) or [] + use_all = getattr(args, "all", False) + categories = getattr(args, "category", []) or [] + string_filter = getattr(args, "string", "") or "" + + # Proxy commands currently do not support --tag, so it is not checked here. + return bool( + use_all + or identifiers + or categories + or string_filter + ) + + +def _select_repo_for_current_directory( + ctx: CLIContext, +) -> List[Dict[str, Any]]: + """ + Heuristic: find the repository whose local directory matches the + current working directory or is the closest parent. + """ + cwd = os.path.abspath(os.getcwd()) + candidates: List[tuple[str, Dict[str, Any]]] = [] + + for repo in ctx.all_repositories: + repo_dir = repo.get("directory") + if not repo_dir: + try: + repo_dir = get_repo_dir(ctx.repositories_base_dir, repo) + except Exception: + repo_dir = None + if not repo_dir: + continue + + repo_dir_abs = os.path.abspath(os.path.expanduser(repo_dir)) + if cwd == repo_dir_abs or cwd.startswith(repo_dir_abs + os.sep): + candidates.append((repo_dir_abs, repo)) + + if not candidates: + return [] + + # Pick the repo with the longest (most specific) path. + candidates.sort(key=lambda item: len(item[0]), reverse=True) + return [candidates[0][1]] + + def register_proxy_commands( subparsers: argparse._SubParsersAction, ) -> None: """ - Register proxy commands (git, docker, docker compose) as - top-level subcommands on the given subparsers. + Register proxy subcommands for git, docker, docker compose, ... """ for command, subcommands in PROXY_COMMANDS.items(): for subcommand in subcommands: @@ -100,7 +170,8 @@ def register_proxy_commands( help=f"Proxies '{command} {subcommand}' to repository/ies", description=( f"Executes '{command} {subcommand}' for the " - "identified repos.\nTo recieve more help execute " + "selected repositories. " + "For more details see the underlying tool's help: " f"'{command} {subcommand} --help'" ), formatter_class=argparse.RawTextHelpFormatter, @@ -129,8 +200,8 @@ def register_proxy_commands( def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool: """ - If the parsed command is a proxy command, execute it and return True. - Otherwise return False to let the main dispatcher continue. + If the top-level command is one of the proxy subcommands + (git / docker / docker compose), handle it here and return True. """ all_proxy_subcommands = { sub for subs in PROXY_COMMANDS.values() for sub in subs @@ -139,12 +210,17 @@ def maybe_handle_proxy(args: argparse.Namespace, ctx: CLIContext) -> bool: if args.command not in all_proxy_subcommands: return False - # Use generic selection semantics for proxies - selected = get_selected_repos( - getattr(args, "all", False), - ctx.all_repositories, - getattr(args, "identifiers", []), - ) + # Default semantics: without explicit selection → repo of current folder. + if _proxy_has_explicit_selection(args): + selected = get_selected_repos(args, ctx.all_repositories) + else: + selected = _select_repo_for_current_directory(ctx) + if not selected: + print( + "[ERROR] No repository matches the current directory. " + "Specify identifiers or use --all/--category/--string." + ) + sys.exit(1) for command, subcommands in PROXY_COMMANDS.items(): if args.command not in subcommands: diff --git a/pkgmgr/config_init.py b/pkgmgr/config_init.py index 0651aa5..a1b9e32 100644 --- a/pkgmgr/config_init.py +++ b/pkgmgr/config_init.py @@ -1,46 +1,122 @@ -import subprocess +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Initialize user configuration by scanning the repositories base directory. + +This module scans the path: + + defaults_config["directories"]["repositories"] + +with the expected structure: + + {base}/{provider}/{account}/{repository} + +For each discovered repository, the function: + • derives provider, account, repository from the folder structure + • (optionally) determines the latest commit hash via git log + • generates a unique CLI alias + • marks ignore=True for newly discovered repos + • skips repos already known in defaults or user config +""" + +from __future__ import annotations + import os +import subprocess +from typing import Any, Dict + from pkgmgr.generate_alias import generate_alias from pkgmgr.save_user_config import save_user_config -def config_init(user_config, defaults_config, bin_dir,USER_CONFIG_PATH:str): + +def config_init( + user_config: Dict[str, Any], + defaults_config: Dict[str, Any], + bin_dir: str, + user_config_path: str, +) -> None: """ - Scan the base directory (defaults_config["base"]) for repositories. - The folder structure is assumed to be: - {base}/{provider}/{account}/{repository} - For each repository found, automatically determine: - - provider, account, repository from folder names. - - verified: the latest commit (via 'git log -1 --format=%H'). - - alias: generated from the repository name using generate_alias(). - Repositories already defined in defaults_config["repositories"] or user_config["repositories"] are skipped. + Scan the repositories base directory and add missing entries + to the user configuration. """ - repositories_base_dir = os.path.expanduser(defaults_config["directories"]["repositories"]) + + # ------------------------------------------------------------ + # Announce where we will write the result + # ------------------------------------------------------------ + print("============================================================") + print(f"[INIT] Writing user configuration to:") + print(f" {user_config_path}") + print("============================================================") + + repositories_base_dir = os.path.expanduser( + defaults_config["directories"]["repositories"] + ) + + print(f"[INIT] Scanning repository base directory:") + print(f" {repositories_base_dir}") + print("") + if not os.path.isdir(repositories_base_dir): - print(f"Base directory '{repositories_base_dir}' does not exist.") + print(f"[ERROR] Base directory does not exist: {repositories_base_dir}") return - default_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository")) - for entry in defaults_config.get("repositories", [])} - existing_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository")) - for entry in user_config.get("repositories", [])} - existing_aliases = {entry.get("alias") for entry in user_config.get("repositories", []) if entry.get("alias")} + default_keys = { + (entry.get("provider"), entry.get("account"), entry.get("repository")) + for entry in defaults_config.get("repositories", []) + } + existing_keys = { + (entry.get("provider"), entry.get("account"), entry.get("repository")) + for entry in user_config.get("repositories", []) + } + existing_aliases = { + entry.get("alias") + for entry in user_config.get("repositories", []) + if entry.get("alias") + } new_entries = [] + scanned = 0 + skipped = 0 + + # ------------------------------------------------------------ + # Actual scanning + # ------------------------------------------------------------ for provider in os.listdir(repositories_base_dir): provider_path = os.path.join(repositories_base_dir, provider) if not os.path.isdir(provider_path): continue + + print(f"[SCAN] Provider: {provider}") + for account in os.listdir(provider_path): account_path = os.path.join(provider_path, account) if not os.path.isdir(account_path): continue + + print(f"[SCAN] Account: {account}") + for repo_name in os.listdir(account_path): repo_path = os.path.join(account_path, repo_name) if not os.path.isdir(repo_path): continue + + scanned += 1 key = (provider, account, repo_name) - if key in default_keys or key in existing_keys: + + # Already known? + if key in default_keys: + skipped += 1 + print(f"[SKIP] (defaults) {provider}/{account}/{repo_name}") continue + if key in existing_keys: + skipped += 1 + print(f"[SKIP] (user-config) {provider}/{account}/{repo_name}") + continue + + print(f"[ADD] {provider}/{account}/{repo_name}") + + # Determine commit hash try: result = subprocess.run( ["git", "log", "-1", "--format=%H"], @@ -51,25 +127,55 @@ def config_init(user_config, defaults_config, bin_dir,USER_CONFIG_PATH:str): check=True, ) verified = result.stdout.strip() - except Exception as e: + print(f"[INFO] Latest commit: {verified}") + except Exception as exc: verified = "" - print(f"Could not determine latest commit for {repo_name} ({provider}/{account}): {e}") + print(f"[WARN] Could not read commit: {exc}") entry = { "provider": provider, "account": account, "repository": repo_name, "verified": {"commit": verified}, - "ignore": True + "ignore": True, } - alias = generate_alias({"repository": repo_name, "provider": provider, "account": account}, bin_dir, existing_aliases) + + # Alias generation + alias = generate_alias( + { + "repository": repo_name, + "provider": provider, + "account": account, + }, + bin_dir, + existing_aliases, + ) entry["alias"] = alias existing_aliases.add(alias) - new_entries.append(entry) - print(f"Adding new repo entry: {entry}") + print(f"[INFO] Alias generated: {alias}") + new_entries.append(entry) + + print("") # blank line between accounts + + # ------------------------------------------------------------ + # Summary + # ------------------------------------------------------------ + print("============================================================") + print(f"[DONE] Scanned repositories: {scanned}") + print(f"[DONE] Skipped (known): {skipped}") + print(f"[DONE] New entries discovered: {len(new_entries)}") + print("============================================================") + + # ------------------------------------------------------------ + # Save if needed + # ------------------------------------------------------------ if new_entries: user_config.setdefault("repositories", []).extend(new_entries) - save_user_config(user_config,USER_CONFIG_PATH) + save_user_config(user_config, user_config_path) + print(f"[SAVE] Wrote user configuration to:") + print(f" {user_config_path}") else: - print("No new repositories found.") + print("[INFO] No new repositories were added.") + + print("============================================================") diff --git a/pkgmgr/get_selected_repos.py b/pkgmgr/get_selected_repos.py index 23a8bfc..1b94e1c 100644 --- a/pkgmgr/get_selected_repos.py +++ b/pkgmgr/get_selected_repos.py @@ -1,29 +1,170 @@ -import os -import sys -from .resolve_repos import resolve_repos -from .filter_ignored import filter_ignored -from .get_repo_dir import get_repo_dir +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +import os +import re +from typing import Any, Dict, List, Sequence + +from pkgmgr.resolve_repos import resolve_repos + +Repository = Dict[str, Any] + + +def _compile_maybe_regex(pattern: str): + """ + If pattern is of the form /.../, return a compiled regex (case-insensitive). + Otherwise return None. + """ + if len(pattern) >= 2 and pattern.startswith("/") and pattern.endswith("/"): + try: + return re.compile(pattern[1:-1], re.IGNORECASE) + except re.error: + return None + return None + + +def _match_pattern(value: str, pattern: str) -> bool: + """ + Match a value against a pattern that may be a substring or /regex/. + """ + if not pattern: + return True + regex = _compile_maybe_regex(pattern) + if regex: + return bool(regex.search(value)) + return pattern.lower() in value.lower() + + +def _match_any(values: Sequence[str], pattern: str) -> bool: + """ + Return True if any of the values matches the pattern. + """ + for v in values: + if _match_pattern(v, pattern): + return True + return False + + +def _build_identifier_string(repo: Repository) -> str: + """ + Build a combined identifier string for string-based filtering. + """ + provider = str(repo.get("provider", "")) + account = str(repo.get("account", "")) + repository = str(repo.get("repository", "")) + alias = str(repo.get("alias", "")) + description = str(repo.get("description", "")) + directory = str(repo.get("directory", "")) + + parts = [ + provider, + account, + repository, + alias, + f"{provider}/{account}/{repository}", + description, + directory, + ] + return " ".join(p for p in parts if p) + + +def _apply_filters( + repos: List[Repository], + string_pattern: str, + category_patterns: List[str], + tag_patterns: List[str], +) -> List[Repository]: + if not string_pattern and not category_patterns and not tag_patterns: + return repos + + filtered: List[Repository] = [] + + for repo in repos: + # String filter + if string_pattern: + ident_str = _build_identifier_string(repo) + if not _match_pattern(ident_str, string_pattern): + continue + + # Category filter: nur echte Kategorien, KEINE Tags + if category_patterns: + cats: List[str] = [] + cats.extend(map(str, repo.get("category_files", []))) + if "category" in repo: + cats.append(str(repo["category"])) + + if not cats: + continue + + ok = True + for pat in category_patterns: + if not _match_any(cats, pat): + ok = False + break + if not ok: + continue + + # Tag filter: ausschließlich YAML-Tags + if tag_patterns: + tags: List[str] = list(map(str, repo.get("tags", []))) + if not tags: + continue + + ok = True + for pat in tag_patterns: + if not _match_any(tags, pat): + ok = False + break + if not ok: + continue + + filtered.append(repo) -def get_selected_repos(show_all: bool, all_repos_list, identifiers=None): - if show_all: - selected = all_repos_list - else: - selected = resolve_repos(identifiers, all_repos_list) - - # If no repositories were found using the provided identifiers, - # try to automatically select based on the current directory: - if not selected: - current_dir = os.getcwd() - directory_name = os.path.basename(current_dir) - # Pack the directory name in a list since resolve_repos expects a list. - auto_selected = resolve_repos([directory_name], all_repos_list) - if auto_selected: - # Check if the path of the first auto-selected repository matches the current directory. - if os.path.abspath(auto_selected[0].get("directory")) == os.path.abspath(current_dir): - print(f"Repository {auto_selected[0]['repository']} has been auto-selected by path.") - selected = auto_selected - filtered = filter_ignored(selected) - if not filtered: - print("Error: No repositories had been selected.") - sys.exit(4) return filtered + +def get_selected_repos(args, all_repositories: List[Repository]) -> List[Repository]: + """ + Compute the list of repositories selected by CLI arguments. + + Modes: + - If identifiers are given: select via resolve_repos() from all_repositories. + - Else if any of --category/--string/--tag is used: start from all_repositories + and apply filters. + - Else if --all is set: select all_repositories. + - Else: try to select the repository of the current working directory. + """ + identifiers: List[str] = getattr(args, "identifiers", []) or [] + use_all: bool = bool(getattr(args, "all", False)) + category_patterns: List[str] = getattr(args, "category", []) or [] + string_pattern: str = getattr(args, "string", "") or "" + tag_patterns: List[str] = getattr(args, "tag", []) or [] + + has_filters = bool(category_patterns or string_pattern or tag_patterns) + + # 1) Explicit identifiers win + if identifiers: + base = resolve_repos(identifiers, all_repositories) + return _apply_filters(base, string_pattern, category_patterns, tag_patterns) + + # 2) Filter-only mode: start from all repositories + if has_filters: + return _apply_filters(list(all_repositories), string_pattern, category_patterns, tag_patterns) + + # 3) --all (no filters): all repos + if use_all: + return list(all_repositories) + + # 4) Fallback: try to select repository of current working directory + cwd = os.path.abspath(os.getcwd()) + by_dir = [ + repo + for repo in all_repositories + if os.path.abspath(str(repo.get("directory", ""))) == cwd + ] + if by_dir: + return by_dir + + # No specific match -> empty list + return [] diff --git a/pkgmgr/list_repositories.py b/pkgmgr/list_repositories.py index c391207..be1f005 100644 --- a/pkgmgr/list_repositories.py +++ b/pkgmgr/list_repositories.py @@ -1,108 +1,352 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Pretty-print repository list with status, categories, tags and path. + +- Tags come exclusively from YAML: repo["tags"]. +- Categories come from repo["category_files"] (YAML file names without + .yml/.yaml) and optional repo["category"]. +- Optional detail mode (--description) prints an extended section per + repository with description, homepage, etc. +""" + +from __future__ import annotations + import os -from pkgmgr.get_repo_identifier import get_repo_identifier -from pkgmgr.get_repo_dir import get_repo_dir +import re +from textwrap import wrap +from typing import Any, Dict, List, Optional -def list_repositories(all_repos, repositories_base_dir, bin_dir, search_filter="", status_filter=""): +Repository = Dict[str, Any] + +RESET = "\033[0m" +BOLD = "\033[1m" +DIM = "\033[2m" +GREEN = "\033[32m" +YELLOW = "\033[33m" +RED = "\033[31m" +MAGENTA = "\033[35m" +GREY = "\033[90m" + + +def _compile_maybe_regex(pattern: str) -> Optional[re.Pattern[str]]: """ - Lists all repositories with their attributes and status information. - The repositories are sorted in ascending order by their identifier. - - Parameters: - all_repos (list): List of repository configurations. - repositories_base_dir (str): The base directory where repositories are located. - bin_dir (str): The directory where executable wrappers are stored. - search_filter (str): Filter for repository attributes (case insensitive). - status_filter (str): Filter for computed status info (case insensitive). - - For each repository, the identifier is printed in bold, the description (if available) - in italic, then all other attributes and computed status are printed. - If the repository is installed, a hint is displayed under the attributes. - Repositories are filtered out if either the search_filter is not found in any attribute or - if the status_filter is not found in the computed status string. + If pattern is of the form /.../, return a compiled regex (case-insensitive). + Otherwise return None. """ - search_filter = search_filter.lower() if search_filter else "" - status_filter = status_filter.lower() if status_filter else "" + if not pattern: + return None + if len(pattern) >= 2 and pattern.startswith("/") and pattern.endswith("/"): + try: + return re.compile(pattern[1:-1], re.IGNORECASE) + except re.error: + return None + return None - # Define status colors using colors not used for other attributes: - # Avoid red (for ignore), blue (for homepage) and yellow (for verified). - status_colors = { - "Installed": "\033[1;32m", # Green - "Not Installed": "\033[1;35m", # Magenta - "Cloned": "\033[1;36m", # Cyan - "Clonable": "\033[1;37m", # White - "Ignored": "\033[38;5;208m", # Orange (extended) - "Active": "\033[38;5;129m", # Light Purple (extended) - "Installable": "\033[38;5;82m" # Light Green (extended) - } - # Sort all repositories by their identifier in ascending order. - sorted_repos = sorted(all_repos, key=lambda repo: get_repo_identifier(repo, all_repos)) +def _status_matches(status: str, status_filter: str) -> bool: + """ + Match a status string against an optional filter (substring or /regex/). + """ + if not status_filter: + return True - for repo in sorted_repos: - # Combine all attribute values into one string for filtering. - repo_text = " ".join(str(v) for v in repo.values()).lower() - if search_filter and search_filter not in repo_text: + regex = _compile_maybe_regex(status_filter) + if regex: + return bool(regex.search(status)) + return status_filter.lower() in status.lower() + + +def _compute_repo_dir(repositories_base_dir: str, repo: Repository) -> str: + """ + Compute the local directory for a repository. + + If the repository already has a 'directory' key, that is used; + otherwise the path is constructed from provider/account/repository + under repositories_base_dir. + """ + if repo.get("directory"): + return os.path.expanduser(str(repo["directory"])) + + provider = str(repo.get("provider", "")) + account = str(repo.get("account", "")) + repository = str(repo.get("repository", "")) + + return os.path.join( + os.path.expanduser(repositories_base_dir), + provider, + account, + repository, + ) + + +def _compute_status( + repo: Repository, + repo_dir: str, + binaries_dir: str, +) -> str: + """ + Compute a human-readable status string, e.g. 'present,alias,ignored'. + """ + parts: List[str] = [] + + exists = os.path.isdir(repo_dir) + if exists: + parts.append("present") + else: + parts.append("absent") + + alias = repo.get("alias") + if alias: + alias_path = os.path.join(os.path.expanduser(binaries_dir), str(alias)) + if os.path.exists(alias_path): + parts.append("alias") + else: + parts.append("alias-missing") + + if repo.get("ignore"): + parts.append("ignored") + + return ",".join(parts) if parts else "-" + + +def _color_status(status_padded: str) -> str: + """ + Color individual status flags inside a padded status string. + + Input is expected to be right-padded to the column width. + + Color mapping: + - present -> green + - absent -> red + - alias -> red + - alias-missing -> red + - ignored -> magenta + - other -> default + """ + core = status_padded.rstrip() + pad_spaces = len(status_padded) - len(core) + + plain_parts = core.split(",") if core else [] + colored_parts: List[str] = [] + + for raw_part in plain_parts: + name = raw_part.strip() + if not name: continue - # Compute status information for the repository. - identifier = get_repo_identifier(repo, all_repos) - executable_path = os.path.join(bin_dir, identifier) - repo_dir = get_repo_dir(repositories_base_dir, repo) - status_list = [] - - # Check if the executable exists (Installed). - if os.path.exists(executable_path): - status_list.append("Installed") + if name == "present": + color = GREEN + elif name == "absent": + color = MAGENTA + elif name in ("alias", "alias-missing"): + color = YELLOW + elif name == "ignored": + color = MAGENTA else: - status_list.append("Not Installed") - # Check if the repository directory exists (Cloned). - if os.path.exists(repo_dir): - status_list.append("Cloned") - else: - status_list.append("Clonable") - # Mark ignored repositories. - if repo.get("ignore", False): - status_list.append("Ignored") - else: - status_list.append("Active") - # Define installable as cloned but not installed. - if os.path.exists(repo_dir) and not os.path.exists(executable_path): - status_list.append("Installable") + color = "" - # Build a colored status string. - colored_statuses = [f"{status_colors.get(s, '')}{s}\033[0m" for s in status_list] - status_str = ", ".join(colored_statuses) + if color: + colored_parts.append(f"{color}{name}{RESET}") + else: + colored_parts.append(name) - # If a status_filter is provided, only display repos whose status contains the filter. - if status_filter and status_filter not in status_str.lower(): + colored_core = ",".join(colored_parts) + return colored_core + (" " * pad_spaces) + + +def list_repositories( + repositories: List[Repository], + repositories_base_dir: str, + binaries_dir: str, + search_filter: str = "", + status_filter: str = "", + extra_tags: Optional[List[str]] = None, + show_description: bool = False, +) -> None: + """ + Print a table of repositories and (optionally) detailed descriptions. + + Parameters + ---------- + repositories: + Repositories to show (usually already filtered by get_selected_repos). + repositories_base_dir: + Base directory where repositories live. + binaries_dir: + Directory where alias symlinks live. + search_filter: + Optional substring/regex filter on identifier and metadata. + status_filter: + Optional filter on computed status. + extra_tags: + Additional tags to show for each repository (CLI overlay only). + show_description: + If True, print a detailed block for each repository after the table. + """ + if extra_tags is None: + extra_tags = [] + + search_regex = _compile_maybe_regex(search_filter) + rows: List[Dict[str, Any]] = [] + + # ------------------------------------------------------------------ + # Build rows + # ------------------------------------------------------------------ + for repo in repositories: + identifier = str(repo.get("repository") or repo.get("alias") or "") + alias = str(repo.get("alias") or "") + provider = str(repo.get("provider") or "") + account = str(repo.get("account") or "") + description = str(repo.get("description") or "") + homepage = str(repo.get("homepage") or "") + + repo_dir = _compute_repo_dir(repositories_base_dir, repo) + status = _compute_status(repo, repo_dir, binaries_dir) + + if not _status_matches(status, status_filter): continue - # Display repository details: - # Print the identifier in bold. - print(f"\033[1m{identifier}\033[0m") - # Print the description in italic if it exists. - description = repo.get("description") + if search_filter: + haystack = " ".join( + [ + identifier, + alias, + provider, + account, + description, + homepage, + repo_dir, + ] + ) + if search_regex: + if not search_regex.search(haystack): + continue + else: + if search_filter.lower() not in haystack.lower(): + continue + + categories: List[str] = [] + categories.extend(map(str, repo.get("category_files", []))) + if repo.get("category"): + categories.append(str(repo["category"])) + + yaml_tags: List[str] = list(map(str, repo.get("tags", []))) + display_tags: List[str] = sorted( + set(yaml_tags + list(map(str, extra_tags))) + ) + + rows.append( + { + "repo": repo, + "identifier": identifier, + "status": status, + "categories": categories, + "tags": display_tags, + "dir": repo_dir, + } + ) + + if not rows: + print("No repositories matched the given filters.") + return + + # ------------------------------------------------------------------ + # Table section (header grey, values white, per-flag colored status) + # ------------------------------------------------------------------ + ident_width = max(len("IDENTIFIER"), max(len(r["identifier"]) for r in rows)) + status_width = max(len("STATUS"), max(len(r["status"]) for r in rows)) + cat_width = max( + len("CATEGORIES"), + max((len(",".join(r["categories"])) for r in rows), default=0), + ) + tag_width = max( + len("TAGS"), + max((len(",".join(r["tags"])) for r in rows), default=0), + ) + + header = ( + f"{GREY}{BOLD}" + f"{'IDENTIFIER'.ljust(ident_width)} " + f"{'STATUS'.ljust(status_width)} " + f"{'CATEGORIES'.ljust(cat_width)} " + f"{'TAGS'.ljust(tag_width)} " + f"DIR" + f"{RESET}" + ) + print(header) + print("-" * (ident_width + status_width + cat_width + tag_width + 10 + 40)) + + for r in rows: + ident_col = r["identifier"].ljust(ident_width) + cat_col = ",".join(r["categories"]).ljust(cat_width) + tag_col = ",".join(r["tags"]).ljust(tag_width) + dir_col = r["dir"] + status = r["status"] + + status_padded = status.ljust(status_width) + status_colored = _color_status(status_padded) + + print( + f"{ident_col} " + f"{status_colored} " + f"{cat_col} " + f"{tag_col} " + f"{dir_col}" + ) + + # ------------------------------------------------------------------ + # Detailed section (alias value red, same status coloring) + # ------------------------------------------------------------------ + if not show_description: + return + + print() + print(f"{BOLD}Detailed repository information:{RESET}") + print() + + for r in rows: + repo = r["repo"] + identifier = r["identifier"] + alias = str(repo.get("alias") or "") + provider = str(repo.get("provider") or "") + account = str(repo.get("account") or "") + repository = str(repo.get("repository") or "") + description = str(repo.get("description") or "") + homepage = str(repo.get("homepage") or "") + categories = r["categories"] + tags = r["tags"] + repo_dir = r["dir"] + status = r["status"] + + print(f"{BOLD}{identifier}{RESET}") + + print(f" Provider: {provider}") + print(f" Account: {account}") + print(f" Repository: {repository}") + + # Alias value highlighted in red + if alias: + print(f" Alias: {RED}{alias}{RESET}") + + status_colored = _color_status(status) + print(f" Status: {status_colored}") + + if categories: + print(f" Categories: {', '.join(categories)}") + + if tags: + print(f" Tags: {', '.join(tags)}") + + print(f" Directory: {repo_dir}") + + if homepage: + print(f" Homepage: {homepage}") + if description: - print(f"\n\033[3m{description}\033[0m") - print("\nAttributes:") - # Loop through all attributes. - for key, value in repo.items(): - formatted_value = str(value) - # Special formatting for the "verified" attribute (yellow). - if key == "verified" and value: - formatted_value = f"\033[1;33m{value}\033[0m" - # Special formatting for the "ignore" flag (red if True). - if key == "ignore" and value: - formatted_value = f"\033[1;31m{value}\033[0m" - if key == "description": - continue - # Highlight homepage in blue. - if key.lower() == "homepage" and value: - formatted_value = f"\033[1;34m{value}\033[0m" - print(f" {key}: {formatted_value}") - # Always display the computed status. - print(f" Status: {status_str}") - # If the repository is installed, display a hint for more info. - if os.path.exists(executable_path): - print(f"\nMore information and help: \033[1;4mpkgmgr {identifier} --help\033[0m\n") - print("-" * 40) + print(" Description:") + for line in wrap(description, width=78): + print(f" {line}") + + print() diff --git a/pkgmgr/load_config.py b/pkgmgr/load_config.py index 8b93194..49d8f94 100644 --- a/pkgmgr/load_config.py +++ b/pkgmgr/load_config.py @@ -1,30 +1,305 @@ -import sys -import yaml -import os -from .get_repo_dir import get_repo_dir -DEFAULT_CONFIG_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../","config", "defaults.yaml") +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- -def load_config(user_config_path): - """Load configuration from defaults and merge in user config if present.""" - if not os.path.exists(DEFAULT_CONFIG_PATH): - print(f"Default configuration file '{DEFAULT_CONFIG_PATH}' not found.") - sys.exit(5) - with open(DEFAULT_CONFIG_PATH, 'r') as f: - config = yaml.safe_load(f) - if "directories" not in config or "repositories" not in config: - print("Default config file must contain 'directories' and 'repositories' keys.") - sys.exit(6) - if os.path.exists(user_config_path): - with open(user_config_path, 'r') as f: - user_config = yaml.safe_load(f) - if user_config: - if "directories" in user_config: - config["directories"] = user_config["directories"] - if "repositories" in user_config: - config["repositories"].extend(user_config["repositories"]) - for repository in config["repositories"]: - # You can overwritte the directory path in the config - if "directory" not in repository: - directory = get_repo_dir(config["directories"]["repositories"], repository) - repository["directory"] = os.path.expanduser(directory) - return config \ No newline at end of file +""" +Load and merge pkgmgr configuration. + +Layering rules: + +1. Defaults / category files: + - Zuerst werden alle *.yml/*.yaml (außer config.yaml) im + Benutzerverzeichnis geladen: + ~/.config/pkgmgr/ + + - Falls dort keine passenden Dateien existieren, wird auf die im + Paket / Projekt mitgelieferten Config-Verzeichnisse zurückgegriffen: + + /config_defaults + /config + /config_defaults + /config + + Dabei werden ebenfalls alle *.yml/*.yaml als Layer geladen. + + - Der Dateiname ohne Endung (stem) wird als Kategorie-Name + verwendet und in repo["category_files"] eingetragen. + +2. User config: + - ~/.config/pkgmgr/config.yaml (oder der übergebene Pfad) + wird geladen und PER LISTEN-MERGE über die Defaults gelegt: + - directories: dict deep-merge + - repositories: per _merge_repo_lists (kein Löschen!) + +3. Ergebnis: + - Ein dict mit mindestens: + config["directories"] (dict) + config["repositories"] (list[dict]) +""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Any, Dict, List, Tuple + +import yaml + +Repo = Dict[str, Any] + + +# --------------------------------------------------------------------------- +# Hilfsfunktionen +# --------------------------------------------------------------------------- + +def _deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]: + """ + Recursively merge two dictionaries. + + Values from `override` win over values in `base`. + """ + for key, value in override.items(): + if ( + key in base + and isinstance(base[key], dict) + and isinstance(value, dict) + ): + _deep_merge(base[key], value) + else: + base[key] = value + return base + + +def _repo_key(repo: Repo) -> Tuple[str, str, str]: + """ + Normalised key for identifying a repository across config files. + """ + return ( + str(repo.get("provider", "")), + str(repo.get("account", "")), + str(repo.get("repository", "")), + ) + + +def _merge_repo_lists( + base_list: List[Repo], + new_list: List[Repo], + category_name: str | None = None, +) -> List[Repo]: + """ + Merge two repository lists, matching by (provider, account, repository). + + - Wenn ein Repo aus new_list noch nicht existiert, wird es hinzugefügt. + - Wenn es existiert, werden seine Felder per Deep-Merge überschrieben. + - Wenn category_name gesetzt ist, wird dieser in + repo["category_files"] eingetragen. + """ + index: Dict[Tuple[str, str, str], Repo] = { + _repo_key(r): r for r in base_list + } + + for src in new_list: + key = _repo_key(src) + if key == ("", "", ""): + # Unvollständiger Schlüssel -> einfach anhängen + dst = dict(src) + if category_name: + dst.setdefault("category_files", []) + if category_name not in dst["category_files"]: + dst["category_files"].append(category_name) + base_list.append(dst) + continue + + existing = index.get(key) + if existing is None: + dst = dict(src) + if category_name: + dst.setdefault("category_files", []) + if category_name not in dst["category_files"]: + dst["category_files"].append(category_name) + base_list.append(dst) + index[key] = dst + else: + _deep_merge(existing, src) + if category_name: + existing.setdefault("category_files", []) + if category_name not in existing["category_files"]: + existing["category_files"].append(category_name) + + return base_list + + +def _load_yaml_file(path: Path) -> Dict[str, Any]: + """ + Load a single YAML file as dict. Non-dicts yield {}. + """ + if not path.is_file(): + return {} + with path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + if not isinstance(data, dict): + return {} + return data + + +def _load_layer_dir( + config_dir: Path, + skip_filename: str | None = None, +) -> Dict[str, Any]: + """ + Load all *.yml/*.yaml from a directory as layered defaults. + + - skip_filename: Dateiname (z.B. "config.yaml"), der ignoriert + werden soll (z.B. User-Config). + + Rückgabe: + { + "directories": {...}, + "repositories": [...], + } + """ + defaults: Dict[str, Any] = {"directories": {}, "repositories": []} + + if not config_dir.is_dir(): + return defaults + + yaml_files = [ + p + for p in config_dir.iterdir() + if p.is_file() + and p.suffix.lower() in (".yml", ".yaml") + and (skip_filename is None or p.name != skip_filename) + ] + if not yaml_files: + return defaults + + yaml_files.sort(key=lambda p: p.name) + + for path in yaml_files: + data = _load_yaml_file(path) + category_name = path.stem # Dateiname ohne .yml/.yaml + + dirs = data.get("directories") + if isinstance(dirs, dict): + defaults.setdefault("directories", {}) + _deep_merge(defaults["directories"], dirs) + + repos = data.get("repositories") + if isinstance(repos, list): + defaults.setdefault("repositories", []) + _merge_repo_lists( + defaults["repositories"], + repos, + category_name=category_name, + ) + + return defaults + + +def _load_defaults_from_package_or_project() -> Dict[str, Any]: + """ + Fallback: Versuche Defaults aus dem installierten Paket ODER + aus dem Projekt-Root zu laden: + + /config_defaults + /config + /config_defaults + /config + """ + try: + import pkgmgr # type: ignore + except Exception: + return {"directories": {}, "repositories": []} + + pkg_root = Path(pkgmgr.__file__).resolve().parent + project_root = pkg_root.parent + + candidates = [ + pkg_root / "config_defaults", + pkg_root / "config", + project_root / "config_defaults", + project_root / "config", + ] + + for cand in candidates: + defaults = _load_layer_dir(cand, skip_filename=None) + if defaults["directories"] or defaults["repositories"]: + return defaults + + return {"directories": {}, "repositories": []} + + +# --------------------------------------------------------------------------- +# Hauptfunktion +# --------------------------------------------------------------------------- + +def load_config(user_config_path: str) -> Dict[str, Any]: + """ + Load and merge configuration for pkgmgr. + + Schritte: + 1. Ermittle ~/.config/pkgmgr/ (oder das Verzeichnis von user_config_path). + 2. Lade alle *.yml/*.yaml dort (außer der User-Config selbst) als + Defaults / Kategorie-Layer. + 3. Wenn dort nichts gefunden wurde, Fallback auf Paket/Projekt. + 4. Lade die User-Config-Datei selbst (falls vorhanden). + 5. Merge: + - directories: deep-merge (Defaults <- User) + - repositories: _merge_repo_lists (Defaults <- User) + """ + user_config_path_expanded = os.path.expanduser(user_config_path) + user_cfg_path = Path(user_config_path_expanded) + + config_dir = user_cfg_path.parent + if not str(config_dir): + # Fallback, falls jemand nur "config.yaml" übergibt + config_dir = Path(os.path.expanduser("~/.config/pkgmgr")) + config_dir.mkdir(parents=True, exist_ok=True) + + user_cfg_name = user_cfg_path.name + + # 1+2) Defaults / Kategorie-Layer aus dem User-Verzeichnis + defaults = _load_layer_dir(config_dir, skip_filename=user_cfg_name) + + # 3) Falls dort nichts gefunden wurde, Fallback auf Paket/Projekt + if not defaults["directories"] and not defaults["repositories"]: + defaults = _load_defaults_from_package_or_project() + + defaults.setdefault("directories", {}) + defaults.setdefault("repositories", []) + + # 4) User-Config + user_cfg: Dict[str, Any] = {} + if user_cfg_path.is_file(): + user_cfg = _load_yaml_file(user_cfg_path) + user_cfg.setdefault("directories", {}) + user_cfg.setdefault("repositories", []) + + # 5) Merge: directories deep-merge, repositories listen-merge + merged: Dict[str, Any] = {} + + # directories + merged["directories"] = {} + _deep_merge(merged["directories"], defaults["directories"]) + _deep_merge(merged["directories"], user_cfg["directories"]) + + # repositories + merged["repositories"] = [] + _merge_repo_lists(merged["repositories"], defaults["repositories"], category_name=None) + _merge_repo_lists(merged["repositories"], user_cfg["repositories"], category_name=None) + + # andere Top-Level-Keys (falls vorhanden) + other_keys = (set(defaults.keys()) | set(user_cfg.keys())) - { + "directories", + "repositories", + } + for key in other_keys: + base_val = defaults.get(key) + override_val = user_cfg.get(key) + if isinstance(base_val, dict) and isinstance(override_val, dict): + merged[key] = _deep_merge(dict(base_val), override_val) + elif override_val is not None: + merged[key] = override_val + else: + merged[key] = base_val + + return merged diff --git a/pyproject.toml b/pyproject.toml index a878caa..918a25f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta" [project] name = "package-manager" -version = "0.4.2" +version = "0.4.3" description = "Kevin's package-manager tool (pkgmgr)" readme = "README.md" requires-python = ">=3.11" diff --git a/tests/e2e/test_integration_list_commands.py b/tests/e2e/test_integration_list_commands.py new file mode 100644 index 0000000..972f6fb --- /dev/null +++ b/tests/e2e/test_integration_list_commands.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import os +import runpy +import sys +import unittest + +from test_integration_version_commands import PROJECT_ROOT + + +class TestIntegrationListCommands(unittest.TestCase): + """ + Integration tests for `pkgmgr list` with the new selection and + description behaviour. + """ + + def _run_pkgmgr(self, args: list[str], cwd: str | None = None) -> None: + cmd_repr = "pkgmgr " + " ".join(args) + original_argv = list(sys.argv) + original_cwd = os.getcwd() + + try: + if cwd is not None: + os.chdir(cwd) + + # Simulate: pkgmgr + sys.argv = ["pkgmgr"] + args + + try: + runpy.run_module("main", run_name="__main__") + except SystemExit as exc: + code = exc.code if isinstance(exc.code, int) else str(exc.code) + if code != 0: + print() + print(f"[TEST] Command : {cmd_repr}") + print(f"[TEST] Working directory: {os.getcwd()}") + print(f"[TEST] Exit code : {code}") + raise AssertionError( + f"{cmd_repr!r} failed with exit code {code}. " + "Scroll up to inspect the output printed before failure." + ) from exc + finally: + os.chdir(original_cwd) + sys.argv = original_argv + + def test_list_all_repositories(self) -> None: + """ + `pkgmgr list --all` should successfully print the summary table. + """ + self._run_pkgmgr(["list", "--all"], cwd=PROJECT_ROOT) + + def test_list_all_with_description(self) -> None: + """ + `pkgmgr list --all --description` should print the table plus the + detailed section for each repository. + """ + self._run_pkgmgr(["list", "--all", "--description"], cwd=PROJECT_ROOT) + + def test_list_with_string_filter(self) -> None: + """ + `pkgmgr list --string pkgmgr` exercises the new string-based + selection logic on top of the defaults + user config. + """ + self._run_pkgmgr(["list", "--string", "pkgmgr"], cwd=PROJECT_ROOT) diff --git a/tests/e2e/test_integration_proxy_commands.py b/tests/e2e/test_integration_proxy_commands.py new file mode 100644 index 0000000..cfe8202 --- /dev/null +++ b/tests/e2e/test_integration_proxy_commands.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import os +import runpy +import sys +import unittest + +from test_integration_version_commands import PROJECT_ROOT + + +class TestIntegrationProxyCommands(unittest.TestCase): + """ + Integration tests for proxy commands (e.g. git pull) using the new + selection logic and `--preview` mode so no real changes are made. + """ + + def _run_pkgmgr(self, args: list[str], cwd: str | None = None) -> None: + cmd_repr = "pkgmgr " + " ".join(args) + original_argv = list(sys.argv) + original_cwd = os.getcwd() + + try: + if cwd is not None: + os.chdir(cwd) + + # Simulate: pkgmgr + sys.argv = ["pkgmgr"] + args + + try: + runpy.run_module("main", run_name="__main__") + except SystemExit as exc: + code = exc.code if isinstance(exc.code, int) else str(exc.code) + if code != 0: + print() + print(f"[TEST] Command : {cmd_repr}") + print(f"[TEST] Working directory: {os.getcwd()}") + print(f"[TEST] Exit code : {code}") + raise AssertionError( + f"{cmd_repr!r} failed with exit code {code}. " + "Scroll up to inspect the output printed before failure." + ) from exc + finally: + os.chdir(original_cwd) + sys.argv = original_argv + + def test_git_pull_preview_for_pkgmgr(self) -> None: + """ + `pkgmgr pull --preview pkgmgr` should go through the proxy layer, + use get_selected_repos() and only print the underlying git pull + command without executing it. + """ + self._run_pkgmgr( + ["pull", "--preview", "pkgmgr"], + cwd=PROJECT_ROOT, + ) + + def test_git_pull_preview_with_string_filter(self) -> None: + """ + `pkgmgr pull --preview --string pkgmgr` exercises the proxy + + filter-only selection path. + """ + self._run_pkgmgr( + ["pull", "--preview", "--string", "pkgmgr"], + cwd=PROJECT_ROOT, + ) diff --git a/tests/unit/pkgmgr/test_cli.py b/tests/unit/pkgmgr/test_cli.py index a2305f9..4a3658c 100644 --- a/tests/unit/pkgmgr/test_cli.py +++ b/tests/unit/pkgmgr/test_cli.py @@ -73,20 +73,16 @@ class TestCliVersion(unittest.TestCase): # Patch get_selected_repos so that 'version' operates on our temp dir. # In the new modular CLI this function is used inside # pkgmgr.cli_core.dispatch, so we patch it there. - def _fake_selected_repos( - all_flag: bool, - repos: List[dict], - identifiers: List[str], - ): - # We always return exactly one "repository" whose directory is the temp dir. + def _fake_selected_repos(args, all_repositories): return [ - { - "provider": "github.com", - "account": "test", - "repository": "pkgmgr-test", - "directory": self._tmp_dir.name, - } - ] + { + "provider": "github.com", + "account": "test", + "repository": "pkgmgr-test", + "directory": self._tmp_dir.name, + } + ] + self._patch_get_selected_repos = mock.patch( "pkgmgr.cli_core.dispatch.get_selected_repos",