From fb9ea2e24b247c1752ae9646e60e606a721ab9fd Mon Sep 17 00:00:00 2001 From: Kevin Veen-Birkenbach Date: Thu, 6 Mar 2025 11:10:11 +0100 Subject: [PATCH] Refactored main.py into function files --- main.py | 605 ++-------------------------------- pkgmgr/__init__.py | 0 pkgmgr/clone_repos.py | 40 +++ pkgmgr/config_init.py | 73 ++++ pkgmgr/create_executable.py | 73 ++++ pkgmgr/deinstall_repos.py | 18 + pkgmgr/delete_repos.py | 22 ++ pkgmgr/exec_git_command.py | 12 + pkgmgr/filter_ignored.py | 3 + pkgmgr/generate_alias.py | 42 +++ pkgmgr/get_repo_dir.py | 15 + pkgmgr/get_repo_identifier.py | 12 + pkgmgr/get_selected_repos.py | 14 + pkgmgr/install_repos.py | 14 + pkgmgr/interactive_add.py | 38 +++ pkgmgr/list_repositories.py | 102 ++++++ pkgmgr/load_config.py | 25 ++ pkgmgr/resolve_repos.py | 28 ++ pkgmgr/run_command.py | 18 + pkgmgr/save_user_config.py | 9 + pkgmgr/show_config.py | 14 + pkgmgr/status_repos.py | 11 + pkgmgr/update_repos.py | 8 + 23 files changed, 619 insertions(+), 577 deletions(-) create mode 100644 pkgmgr/__init__.py create mode 100644 pkgmgr/clone_repos.py create mode 100644 pkgmgr/config_init.py create mode 100644 pkgmgr/create_executable.py create mode 100644 pkgmgr/deinstall_repos.py create mode 100644 pkgmgr/delete_repos.py create mode 100644 pkgmgr/exec_git_command.py create mode 100644 pkgmgr/filter_ignored.py create mode 100644 pkgmgr/generate_alias.py create mode 100644 pkgmgr/get_repo_dir.py create mode 100644 pkgmgr/get_repo_identifier.py create mode 100644 pkgmgr/get_selected_repos.py create mode 100644 pkgmgr/install_repos.py create mode 100644 pkgmgr/interactive_add.py create mode 100644 pkgmgr/list_repositories.py create mode 100644 pkgmgr/load_config.py create mode 100644 pkgmgr/resolve_repos.py create mode 100644 pkgmgr/run_command.py create mode 100644 pkgmgr/save_user_config.py create mode 100644 pkgmgr/show_config.py create mode 100644 pkgmgr/status_repos.py create mode 100644 pkgmgr/update_repos.py diff --git a/main.py b/main.py index 90ce4dd..128001f 100644 --- a/main.py +++ b/main.py @@ -1,18 +1,34 @@ -import re -import hashlib import os -import subprocess -import shutil -import sys import yaml import argparse import json # Define configuration file paths. -DEFAULT_CONFIG_PATH = os.path.join("config", "defaults.yaml") USER_CONFIG_PATH = os.path.join("config", "config.yaml") BIN_DIR = os.path.expanduser("~/.local/bin") +from pkgmgr.clone_repos import clone_repos +from pkgmgr.config_init import config_init +from pkgmgr.create_executable import create_executable +from pkgmgr.deinstall_repos import deinstall_repos +from pkgmgr.delete_repos import delete_repos +from pkgmgr.exec_git_command import exec_git_command +from pkgmgr.filter_ignored import filter_ignored +from pkgmgr.generate_alias import generate_alias +from pkgmgr.get_repo_dir import get_repo_dir +from pkgmgr.get_repo_identifier import get_repo_identifier +from pkgmgr.get_selected_repos import get_selected_repos +from pkgmgr.install_repos import install_repos +from pkgmgr.interactive_add import interactive_add +from pkgmgr.list_repositories import list_repositories +from pkgmgr.load_config import load_config +from pkgmgr.resolve_repos import resolve_repos +from pkgmgr.run_command import run_command +from pkgmgr.save_user_config import save_user_config +from pkgmgr.show_config import show_config +from pkgmgr.status_repos import status_repos +from pkgmgr.update_repos import update_repos + # Commands proxied by package-manager GIT_DEFAULT_COMMANDS = [ "pull", @@ -27,575 +43,9 @@ GIT_DEFAULT_COMMANDS = [ "commit" ] -def load_config(): - """Load configuration from defaults and merge in user config if present.""" - if not os.path.exists(DEFAULT_CONFIG_PATH): - print(f"Default configuration file '{DEFAULT_CONFIG_PATH}' not found.") - sys.exit(1) - with open(DEFAULT_CONFIG_PATH, 'r') as f: - config = yaml.safe_load(f) - if "directories" not in config or "repositories" not in config: - print("Default config file must contain 'directories' and 'repositories' keys.") - sys.exit(1) - if os.path.exists(USER_CONFIG_PATH): - with open(USER_CONFIG_PATH, 'r') as f: - user_config = yaml.safe_load(f) - if user_config: - if "directories" in user_config: - config["directories"] = user_config["directories"] - if "repositories" in user_config: - config["repositories"].extend(user_config["repositories"]) - return config - -def save_user_config(user_config): - """Save the user configuration to USER_CONFIG_PATH.""" - os.makedirs(os.path.dirname(USER_CONFIG_PATH), exist_ok=True) - with open(USER_CONFIG_PATH, 'w') as f: - yaml.dump(user_config, f) - print(f"User configuration updated in {USER_CONFIG_PATH}.") - -def run_command(command, cwd=None, preview=False): - """Run a shell command in a given directory, or print it in preview mode. - - If the command fails, exit the program with the command's exit code. - """ - current_dir = cwd or os.getcwd() - if preview: - print(f"[Preview] In '{current_dir}': {command}") - else: - print(f"Running in '{current_dir}': {command}") - result = subprocess.run(command, cwd=cwd, shell=True, check=False) - if result.returncode != 0: - print(f"Command failed with exit code {result.returncode}. Exiting.") - sys.exit(result.returncode) - -def get_repo_identifier(repo, all_repos): - """ - Return a unique identifier for the repository. - If the repository name is unique among all_repos, return repository name; - otherwise, return 'provider/account/repository'. - """ - repo_name = repo.get("repository") - count = sum(1 for r in all_repos if r.get("repository") == repo_name) - if count == 1: - return repo_name - else: - return f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}' - -def resolve_repos(identifiers, all_repos): - """ - Given a list of identifier strings, return a list of repository configs. - The identifier can be: - - the full identifier "provider/account/repository" - - the repository name (if unique among all_repos) - - the alias (if defined) - """ - selected = [] - for ident in identifiers: - matches = [] - for repo in all_repos: - full_id = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}' - if ident == full_id: - matches.append(repo) - elif ident == repo.get("alias"): - matches.append(repo) - elif ident == repo.get("repository"): - # Only match if repository name is unique among all_repos. - if sum(1 for r in all_repos if r.get("repository") == ident) == 1: - matches.append(repo) - if not matches: - print(f"Identifier '{ident}' did not match any repository in config.") - else: - selected.extend(matches) - return selected - -def filter_ignored(repos): - """Filter out repositories that have 'ignore' set to True.""" - return [r for r in repos if not r.get("ignore", False)] - -def generate_alias(repo, bin_dir, existing_aliases): - """ - Generate an alias for a repository based on its repository name. - - Steps: - 1. Keep only consonants from the repository name (letters from BCDFGHJKLMNPQRSTVWXYZ). - 2. Collapse consecutive identical consonants. - 3. Truncate to at most 12 characters. - 4. If that alias conflicts (already in existing_aliases or a file exists in bin_dir), - then prefix with the first letter of provider and account. - 5. If still conflicting, append a three-character hash until the alias is unique. - """ - repo_name = repo.get("repository") - # Keep only consonants. - consonants = re.sub(r"[^bcdfghjklmnpqrstvwxyzBCDFGHJKLMNPQRSTVWXYZ]", "", repo_name) - # Collapse consecutive identical consonants. - collapsed = re.sub(r"(.)\1+", r"\1", consonants) - base_alias = collapsed[:12] if len(collapsed) > 12 else collapsed - candidate = base_alias.lower() - - def conflict(alias): - alias_path = os.path.join(bin_dir, alias) - return alias in existing_aliases or os.path.exists(alias_path) - - if not conflict(candidate): - return candidate - - prefix = (repo.get("provider", "")[0] + repo.get("account", "")[0]).lower() - candidate2 = (prefix + candidate)[:12] - if not conflict(candidate2): - return candidate2 - - h = hashlib.md5(repo_name.encode("utf-8")).hexdigest()[:3] - candidate3 = (candidate2 + h)[:12] - while conflict(candidate3): - candidate3 += "x" - candidate3 = candidate3[:12] - return candidate3 - -def create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=False, preview=False, no_verification=False): - """ - Create an executable bash wrapper for the repository. - - If 'verified' is set, the wrapper will checkout that commit and warn (unless quiet is True). - If no verified commit is set, a warning is printed unless quiet is True. - If an 'alias' field is provided, a symlink is created in bin_dir with that alias. - """ - repo_identifier = get_repo_identifier(repo, all_repos) - repo_dir = get_repo_dir(repositories_base_dir,repo) - command = repo.get("command") - if not command: - main_sh = os.path.join(repo_dir, "main.sh") - main_py = os.path.join(repo_dir, "main.py") - if os.path.exists(main_sh): - command = "bash main.sh" - elif os.path.exists(main_py): - command = "python3 main.py" - else: - if not quiet: - print(f"No command defined and no main.sh/main.py found in {repo_dir}. Skipping alias creation.") - return - - ORANGE = r"\033[38;5;208m" - RESET = r"\033[0m" - - if no_verification: - preamble = "" - else: - if verified := repo.get("verified"): - if not quiet: - preamble = f"""\ -git checkout {verified} || echo -e "{ORANGE}Warning: Failed to checkout commit {verified}.{RESET}" -CURRENT=$(git rev-parse HEAD) -if [ "$CURRENT" != "{verified}" ]; then - echo -e "{ORANGE}Warning: Current commit ($CURRENT) does not match verified commit ({verified}).{RESET}" -fi -""" - else: - preamble = "" - else: - preamble = "" if quiet else f'echo -e "{ORANGE}Warning: No verified commit set for this repository.{RESET}"' - - script_content = f"""#!/bin/bash -cd "{repo_dir}" -{preamble} -{command} "$@" -""" - alias_path = os.path.join(bin_dir, repo_identifier) - if preview: - print(f"[Preview] Would create executable '{alias_path}' with content:\n{script_content}") - else: - os.makedirs(bin_dir, exist_ok=True) - with open(alias_path, "w") as f: - f.write(script_content) - os.chmod(alias_path, 0o755) - if not quiet: - print(f"Installed executable for {repo_identifier} at {alias_path}") - - alias_name = repo.get("alias") - if alias_name: - alias_link_path = os.path.join(bin_dir, alias_name) - try: - if os.path.exists(alias_link_path) or os.path.islink(alias_link_path): - os.remove(alias_link_path) - os.symlink(alias_path, alias_link_path) - if not quiet: - print(f"Created alias '{alias_name}' pointing to {repo_identifier}") - except Exception as e: - if not quiet: - print(f"Error creating alias '{alias_name}': {e}") - -def install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, preview=False, quiet=False): - """Install repositories by creating executable wrappers and running setup.""" - for repo in selected_repos: - repo_identifier = get_repo_identifier(repo, all_repos) - repo_dir = get_repo_dir(repositories_base_dir,repo) - if not os.path.exists(repo_dir): - print(f"Repository directory '{repo_dir}' does not exist. Clone it first.") - continue - create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=quiet, preview=preview, no_verification=no_verification) - setup_cmd = repo.get("setup") - if setup_cmd: - run_command(setup_cmd, cwd=repo_dir, preview=preview) - -def exec_git_command(selected_repos, repositories_base_dir, all_repos, git_cmd, extra_args, preview=False): - """Execute a given git command with extra arguments for each repository.""" - for repo in selected_repos: - repo_identifier = get_repo_identifier(repo, all_repos) - repo_dir = get_repo_dir(repositories_base_dir,repo) - if os.path.exists(repo_dir): - full_cmd = f"git {git_cmd} {' '.join(extra_args)}" - run_command(full_cmd, cwd=repo_dir, preview=preview) - else: - print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.") - - -def status_repos(selected_repos, repositories_base_dir, all_repos, extra_args, list_only=False, system_status=False, preview=False): - if system_status: - print("System status:") - run_command("yay -Qu", preview=preview) - if list_only: - for repo in selected_repos: - print(get_repo_identifier(repo, all_repos)) - else: - exec_git_command(selected_repos, repositories_base_dir, all_repos, "status", extra_args, preview) - -def get_repo_dir(repositories_base_dir:str,repo:{})->str: - try: - return os.path.join(repositories_base_dir, repo.get("provider"), repo.get("account"), repo.get("repository")) - except TypeError as e: - if repositories_base_dir: - print(f"Error: {e} \nThe repository {repo} seems not correct configured.\nPlease configure it correct.") - for key in ["provider","account","repository"]: - if not repo.get(key,False): - print(f"Key '{key}' is missing.") - else: - print(f"Error: {e} \nThe base {base} seems not correct configured.\nPlease configure it correct.") - sys.exit(1) - -def clone_repos(selected_repos, repositories_base_dir: str, all_repos, preview=False): - import subprocess # ensure subprocess is imported - for repo in selected_repos: - repo_identifier = get_repo_identifier(repo, all_repos) - repo_dir = get_repo_dir(repositories_base_dir, repo) - if os.path.exists(repo_dir): - print(f"[INFO] Repository '{repo_identifier}' already exists at '{repo_dir}'. Skipping clone.") - continue - - parent_dir = os.path.dirname(repo_dir) - os.makedirs(parent_dir, exist_ok=True) - - # Attempt SSH clone first. - target = repo.get("replacement") if repo.get("replacement") else f"{repo.get('provider')}:{repo.get('account')}/{repo.get('repository')}" - clone_url = f"git@{target}.git" - print(f"[INFO] Attempting to clone '{repo_identifier}' using SSH from {clone_url} into '{repo_dir}'.") - - if preview: - print(f"[Preview] Would run: git clone {clone_url} {repo_dir} in {parent_dir}") - # Simulate a successful clone in preview mode. - result = subprocess.CompletedProcess(args=[], returncode=0) - else: - result = subprocess.run(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, shell=True) - - # If SSH clone returns an error code, ask user whether to try HTTPS. - if result.returncode != 0: - print(f"[WARNING] SSH clone failed for '{repo_identifier}' with return code {result.returncode}.") - choice = input("Do you want to attempt HTTPS clone instead? (y/N): ").strip().lower() - if choice == 'y': - target = repo.get("replacement") if repo.get("replacement") else f"{repo.get('provider')}/{repo.get('account')}/{repo.get('repository')}" - clone_url = f"https://{target}.git" - print(f"[INFO] Attempting to clone '{repo_identifier}' using HTTPS from {clone_url} into '{repo_dir}'.") - if preview: - print(f"[Preview] Would run: git clone {clone_url} {repo_dir} in {parent_dir}") - else: - subprocess.run(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, shell=True) - else: - print(f"[INFO] HTTPS clone not attempted for '{repo_identifier}'.") - -def deinstall_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, preview=False): - for repo in selected_repos: - repo_identifier = get_repo_identifier(repo, all_repos) - alias_path = os.path.join(bin_dir, repo_identifier) - if os.path.exists(alias_path): - if preview: - print(f"[Preview] Would remove executable '{alias_path}'.") - else: - os.remove(alias_path) - print(f"Removed executable for {repo_identifier}.") - else: - print(f"No executable found for {repo_identifier} in {bin_dir}.") - teardown_cmd = repo.get("teardown") - repo_dir = get_repo_dir(repositories_base_dir,repo) - if teardown_cmd and os.path.exists(repo_dir): - run_command(teardown_cmd, cwd=repo_dir, preview=preview) - -def delete_repos(selected_repos, repositories_base_dir, all_repos, preview=False): - for repo in selected_repos: - repo_identifier = get_repo_identifier(repo, all_repos) - repo_dir = get_repo_dir(repositories_base_dir, repo) - if os.path.exists(repo_dir): - confirm = input(f"Are you sure you want to delete directory '{repo_dir}' for {repo_identifier}? [y/N]: ").strip().lower() - if confirm == "y": - if preview: - print(f"[Preview] Would delete directory '{repo_dir}' for {repo_identifier}.") - else: - try: - shutil.rmtree(repo_dir) - print(f"Deleted repository directory '{repo_dir}' for {repo_identifier}.") - except Exception as e: - print(f"Error deleting '{repo_dir}' for {repo_identifier}: {e}") - else: - print(f"Skipped deletion of '{repo_dir}' for {repo_identifier}.") - else: - print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.") - -def update_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, system_update=False, preview=False, quiet=False): - git_default_exec(selected_repos, repositories_base_dir, all_repos, extra_args=[],command="pull", preview=preview) - install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, no_verification, preview=preview, quiet=quiet) - if system_update: - run_command("yay -Syu", preview=preview) - run_command("sudo pacman -Syyu", preview=preview) - -def git_default_exec(selected_repos, repositories_base_dir, all_repos, extra_args, command:str, preview=False): - exec_git_command(selected_repos, repositories_base_dir, all_repos, command, extra_args, preview) - -def show_config(selected_repos, full_config=False): - """Display configuration for one or more repositories, or the entire merged config.""" - if full_config: - merged = load_config() - print(yaml.dump(merged, default_flow_style=False)) - else: - for repo in selected_repos: - identifier = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}' - print(f"Repository: {identifier}") - for key, value in repo.items(): - print(f" {key}: {value}") - print("-" * 40) - -def interactive_add(config): - """Interactively prompt the user to add a new repository entry to the user config.""" - print("Adding a new repository configuration entry.") - new_entry = {} - new_entry["provider"] = input("Provider (e.g., github.com): ").strip() - new_entry["account"] = input("Account (e.g., yourusername): ").strip() - new_entry["repository"] = input("Repository name (e.g., mytool): ").strip() - new_entry["verified"] = input("Verified commit id: ").strip() - new_entry["command"] = input("Command (optional, leave blank to auto-detect): ").strip() - new_entry["description"] = input("Description (optional): ").strip() - new_entry["replacement"] = input("Replacement (optional): ").strip() - new_entry["setup"] = input("Setup command (optional): ").strip() - new_entry["teardown"] = input("Teardown command (optional): ").strip() - new_entry["alias"] = input("Alias (optional): ").strip() - # Allow the user to mark this entry as ignored. - ignore_val = input("Ignore this entry? (y/N): ").strip().lower() - if ignore_val == "y": - new_entry["ignore"] = True - - print("\nNew entry:") - for key, value in new_entry.items(): - if value: - print(f"{key}: {value}") - confirm = input("Add this entry to user config? (y/N): ").strip().lower() - if confirm == "y": - if os.path.exists(USER_CONFIG_PATH): - with open(USER_CONFIG_PATH, 'r') as f: - user_config = yaml.safe_load(f) or {} - else: - user_config = {"repositories": []} - user_config.setdefault("repositories", []) - user_config["repositories"].append(new_entry) - save_user_config(user_config) - else: - print("Entry not added.") - -def edit_config(): - """Open the user configuration file in nano.""" - run_command(f"nano {USER_CONFIG_PATH}") - -def config_init(user_config, defaults_config, bin_dir): - """ - Scan the base directory (defaults_config["base"]) for repositories. - The folder structure is assumed to be: - {base}/{provider}/{account}/{repository} - For each repository found, automatically determine: - - provider, account, repository from folder names. - - verified: the latest commit (via 'git log -1 --format=%H'). - - alias: generated from the repository name using generate_alias(). - Repositories already defined in defaults_config["repositories"] or user_config["repositories"] are skipped. - """ - repositories_base_dir = os.path.expanduser(defaults_config["directories"]["repositories"]) - if not os.path.isdir(repositories_base_dir): - print(f"Base directory '{repositories_base_dir}' does not exist.") - return - - default_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository")) - for entry in defaults_config.get("repositories", [])} - existing_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository")) - for entry in user_config.get("repositories", [])} - existing_aliases = {entry.get("alias") for entry in user_config.get("repositories", []) if entry.get("alias")} - - new_entries = [] - for provider in os.listdir(repositories_base_dir): - provider_path = os.path.join(repositories_base_dir, provider) - if not os.path.isdir(provider_path): - continue - for account in os.listdir(provider_path): - account_path = os.path.join(provider_path, account) - if not os.path.isdir(account_path): - continue - for repo_name in os.listdir(account_path): - repo_path = os.path.join(account_path, repo_name) - if not os.path.isdir(repo_path): - continue - key = (provider, account, repo_name) - if key in default_keys or key in existing_keys: - continue - try: - result = subprocess.run( - ["git", "log", "-1", "--format=%H"], - cwd=repo_path, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - check=True, - ) - verified = result.stdout.strip() - except Exception as e: - verified = "" - print(f"Could not determine latest commit for {repo_name} ({provider}/{account}): {e}") - - entry = { - "provider": provider, - "account": account, - "repository": repo_name, - "verified": verified, - "ignore": True - } - alias = generate_alias({"repository": repo_name, "provider": provider, "account": account}, bin_dir, existing_aliases) - entry["alias"] = alias - existing_aliases.add(alias) - new_entries.append(entry) - print(f"Adding new repo entry: {entry}") - - if new_entries: - user_config.setdefault("repositories", []).extend(new_entries) - save_user_config(user_config) - else: - print("No new repositories found.") - -def get_selected_repos(show_all:bool,all_repos_list,identifiers=None): - if show_all: - selected = all_repos_list - else: - selected = resolve_repos(identifiers, all_repos_list) - filtered = filter_ignored(selected) - if not selected: - print("Error: No repositories had been selected.") - sys.exit(1) - return filtered - -def list_repositories(all_repos, repositories_base_dir, bin_dir, search_filter="", status_filter=""): - """ - List all repositories with their attributes and status information. - - Parameters: - all_repos (list): List of repository configurations. - repositories_base_dir (str): The base directory where repositories are located. - bin_dir (str): The directory where executable wrappers are stored. - search_filter (str): Filter for repository attributes (case insensitive). - status_filter (str): Filter for computed status info (case insensitive). - - For each repository, the identifier is printed in bold, the description (if available) - in italic, then all other attributes and computed status are printed. - If the repository is installed, a hint is displayed under the attributes. - Repositories are filtered out if either the search_filter is not found in any attribute or - if the status_filter is not found in the computed status string. - """ - search_filter = search_filter.lower() if search_filter else "" - status_filter = status_filter.lower() if status_filter else "" - - # Define status colors using colors not used for other attributes: - # Avoid red (for ignore), blue (for homepage) and yellow (for verified). - status_colors = { - "Installed": "\033[1;32m", # Green - "Not Installed": "\033[1;35m", # Magenta - "Cloned": "\033[1;36m", # Cyan - "Clonable": "\033[1;37m", # White - "Ignored": "\033[38;5;208m", # Orange (extended) - "Active": "\033[38;5;129m", # Light Purple (extended) - "Installable": "\033[38;5;82m" # Light Green (extended) - } - - for repo in all_repos: - # Combine all attribute values into one string for filtering. - repo_text = " ".join(str(v) for v in repo.values()).lower() - if search_filter and search_filter not in repo_text: - continue - - # Compute status information for the repository. - identifier = get_repo_identifier(repo, all_repos) - executable_path = os.path.join(bin_dir, identifier) - repo_dir = get_repo_dir(repositories_base_dir, repo) - status_list = [] - - # Check if the executable exists (Installed). - if os.path.exists(executable_path): - status_list.append("Installed") - else: - status_list.append("Not Installed") - # Check if the repository directory exists (Cloned). - if os.path.exists(repo_dir): - status_list.append("Cloned") - else: - status_list.append("Clonable") - # Mark ignored repositories. - if repo.get("ignore", False): - status_list.append("Ignored") - else: - status_list.append("Active") - # Define installable as cloned but not installed. - if os.path.exists(repo_dir) and not os.path.exists(executable_path): - status_list.append("Installable") - - # Build a colored status string. - colored_statuses = [f"{status_colors.get(s, '')}{s}\033[0m" for s in status_list] - status_str = ", ".join(colored_statuses) - - # If a status_filter is provided, only display repos whose status contains the filter. - if status_filter and status_filter not in status_str.lower(): - continue - - # Display repository details: - # Print the identifier in bold. - print(f"\033[1m{identifier}\033[0m") - # Print the description in italic if it exists. - description = repo.get("description") - if description: - print(f"\n\033[3m{description}\033[0m") - print("\nAttributes:") - # Loop through all attributes. - for key, value in repo.items(): - formatted_value = str(value) - # Special formatting for "verified" attribute (yellow). - if key == "verified" and value: - formatted_value = f"\033[1;33m{value}\033[0m" - # Special formatting for "ignore" flag (red if True). - if key == "ignore" and value: - formatted_value = f"\033[1;31m{value}\033[0m" - if key == "description": - continue - # Highlight homepage in blue. - if key.lower() == "homepage" and value: - formatted_value = f"\033[1;34m{value}\033[0m" - print(f" {key}: {formatted_value}") - # Always display the computed status. - print(f" Status: {status_str}") - # If the repository is installed, display a hint for more info. - if os.path.exists(executable_path): - print(f"\nMore information and help: \033[1;4mpkgmgr {identifier} --help\033[0m\n") - print("-" * 40) # Main program. if __name__ == "__main__": - config_merged = load_config() + config_merged = load_config(USER_CONFIG_PATH) repositories_base_dir = os.path.expanduser(config_merged["directories"]["repositories"]) all_repos_list = config_merged["repositories"] description_text = """\ @@ -697,7 +147,7 @@ For detailed help on each command, use: if args.command == "clone": clone_repos(selected, repositories_base_dir, all_repos_list, args.preview) else: - git_default_exec(selected, repositories_base_dir, all_repos_list, args.extra_args, args.command, preview=args.preview) + exec_git_command(selected, repositories_base_dir, all_repos_list, args.command, args.extra_args, preview) elif args.command == "list": list_repositories(all_repos_list, repositories_base_dir, BIN_DIR, search_filter=args.search, status_filter=args.status) elif args.command == "deinstall": @@ -764,15 +214,16 @@ For detailed help on each command, use: elif args.command == "config": if args.subcommand == "show": if args.all or (not args.identifiers): - show_config([], full_config=True) + show_config([], USER_CONFIG_PATH, full_config=True) else: selected = resolve_repos(args.identifiers, all_repos_list) if selected: - show_config(selected, full_config=False) + show_config(selected, USER_CONFIG_PATH, full_config=False) elif args.subcommand == "add": interactive_add(config_merged) elif args.subcommand == "edit": - edit_config() + """Open the user configuration file in nano.""" + run_command(f"nano {USER_CONFIG_PATH}") elif args.subcommand == "init": if os.path.exists(USER_CONFIG_PATH): with open(USER_CONFIG_PATH, 'r') as f: diff --git a/pkgmgr/__init__.py b/pkgmgr/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pkgmgr/clone_repos.py b/pkgmgr/clone_repos.py new file mode 100644 index 0000000..9310bc5 --- /dev/null +++ b/pkgmgr/clone_repos.py @@ -0,0 +1,40 @@ +import subprocess +import os + +def clone_repos(selected_repos, repositories_base_dir: str, all_repos, preview=False): + for repo in selected_repos: + repo_identifier = get_repo_identifier(repo, all_repos) + repo_dir = get_repo_dir(repositories_base_dir, repo) + if os.path.exists(repo_dir): + print(f"[INFO] Repository '{repo_identifier}' already exists at '{repo_dir}'. Skipping clone.") + continue + + parent_dir = os.path.dirname(repo_dir) + os.makedirs(parent_dir, exist_ok=True) + + # Attempt SSH clone first. + target = repo.get("replacement") if repo.get("replacement") else f"{repo.get('provider')}:{repo.get('account')}/{repo.get('repository')}" + clone_url = f"git@{target}.git" + print(f"[INFO] Attempting to clone '{repo_identifier}' using SSH from {clone_url} into '{repo_dir}'.") + + if preview: + print(f"[Preview] Would run: git clone {clone_url} {repo_dir} in {parent_dir}") + # Simulate a successful clone in preview mode. + result = subprocess.CompletedProcess(args=[], returncode=0) + else: + result = subprocess.run(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, shell=True) + + # If SSH clone returns an error code, ask user whether to try HTTPS. + if result.returncode != 0: + print(f"[WARNING] SSH clone failed for '{repo_identifier}' with return code {result.returncode}.") + choice = input("Do you want to attempt HTTPS clone instead? (y/N): ").strip().lower() + if choice == 'y': + target = repo.get("replacement") if repo.get("replacement") else f"{repo.get('provider')}/{repo.get('account')}/{repo.get('repository')}" + clone_url = f"https://{target}.git" + print(f"[INFO] Attempting to clone '{repo_identifier}' using HTTPS from {clone_url} into '{repo_dir}'.") + if preview: + print(f"[Preview] Would run: git clone {clone_url} {repo_dir} in {parent_dir}") + else: + subprocess.run(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, shell=True) + else: + print(f"[INFO] HTTPS clone not attempted for '{repo_identifier}'.") \ No newline at end of file diff --git a/pkgmgr/config_init.py b/pkgmgr/config_init.py new file mode 100644 index 0000000..52ae7e7 --- /dev/null +++ b/pkgmgr/config_init.py @@ -0,0 +1,73 @@ +import subprocess +import os + +def config_init(user_config, defaults_config, bin_dir): + """ + Scan the base directory (defaults_config["base"]) for repositories. + The folder structure is assumed to be: + {base}/{provider}/{account}/{repository} + For each repository found, automatically determine: + - provider, account, repository from folder names. + - verified: the latest commit (via 'git log -1 --format=%H'). + - alias: generated from the repository name using generate_alias(). + Repositories already defined in defaults_config["repositories"] or user_config["repositories"] are skipped. + """ + repositories_base_dir = os.path.expanduser(defaults_config["directories"]["repositories"]) + if not os.path.isdir(repositories_base_dir): + print(f"Base directory '{repositories_base_dir}' does not exist.") + return + + default_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository")) + for entry in defaults_config.get("repositories", [])} + existing_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository")) + for entry in user_config.get("repositories", [])} + existing_aliases = {entry.get("alias") for entry in user_config.get("repositories", []) if entry.get("alias")} + + new_entries = [] + for provider in os.listdir(repositories_base_dir): + provider_path = os.path.join(repositories_base_dir, provider) + if not os.path.isdir(provider_path): + continue + for account in os.listdir(provider_path): + account_path = os.path.join(provider_path, account) + if not os.path.isdir(account_path): + continue + for repo_name in os.listdir(account_path): + repo_path = os.path.join(account_path, repo_name) + if not os.path.isdir(repo_path): + continue + key = (provider, account, repo_name) + if key in default_keys or key in existing_keys: + continue + try: + result = subprocess.run( + ["git", "log", "-1", "--format=%H"], + cwd=repo_path, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True, + ) + verified = result.stdout.strip() + except Exception as e: + verified = "" + print(f"Could not determine latest commit for {repo_name} ({provider}/{account}): {e}") + + entry = { + "provider": provider, + "account": account, + "repository": repo_name, + "verified": verified, + "ignore": True + } + alias = generate_alias({"repository": repo_name, "provider": provider, "account": account}, bin_dir, existing_aliases) + entry["alias"] = alias + existing_aliases.add(alias) + new_entries.append(entry) + print(f"Adding new repo entry: {entry}") + + if new_entries: + user_config.setdefault("repositories", []).extend(new_entries) + save_user_config(user_config) + else: + print("No new repositories found.") diff --git a/pkgmgr/create_executable.py b/pkgmgr/create_executable.py new file mode 100644 index 0000000..933a2f8 --- /dev/null +++ b/pkgmgr/create_executable.py @@ -0,0 +1,73 @@ +import os + +def create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=False, preview=False, no_verification=False): + """ + Create an executable bash wrapper for the repository. + + If 'verified' is set, the wrapper will checkout that commit and warn (unless quiet is True). + If no verified commit is set, a warning is printed unless quiet is True. + If an 'alias' field is provided, a symlink is created in bin_dir with that alias. + """ + repo_identifier = get_repo_identifier(repo, all_repos) + repo_dir = get_repo_dir(repositories_base_dir,repo) + command = repo.get("command") + if not command: + main_sh = os.path.join(repo_dir, "main.sh") + main_py = os.path.join(repo_dir, "main.py") + if os.path.exists(main_sh): + command = "bash main.sh" + elif os.path.exists(main_py): + command = "python3 main.py" + else: + if not quiet: + print(f"No command defined and no main.sh/main.py found in {repo_dir}. Skipping alias creation.") + return + + ORANGE = r"\033[38;5;208m" + RESET = r"\033[0m" + + if no_verification: + preamble = "" + else: + if verified := repo.get("verified"): + if not quiet: + preamble = f"""\ +git checkout {verified} || echo -e "{ORANGE}Warning: Failed to checkout commit {verified}.{RESET}" +CURRENT=$(git rev-parse HEAD) +if [ "$CURRENT" != "{verified}" ]; then + echo -e "{ORANGE}Warning: Current commit ($CURRENT) does not match verified commit ({verified}).{RESET}" +fi +""" + else: + preamble = "" + else: + preamble = "" if quiet else f'echo -e "{ORANGE}Warning: No verified commit set for this repository.{RESET}"' + + script_content = f"""#!/bin/bash +cd "{repo_dir}" +{preamble} +{command} "$@" +""" + alias_path = os.path.join(bin_dir, repo_identifier) + if preview: + print(f"[Preview] Would create executable '{alias_path}' with content:\n{script_content}") + else: + os.makedirs(bin_dir, exist_ok=True) + with open(alias_path, "w") as f: + f.write(script_content) + os.chmod(alias_path, 0o755) + if not quiet: + print(f"Installed executable for {repo_identifier} at {alias_path}") + + alias_name = repo.get("alias") + if alias_name: + alias_link_path = os.path.join(bin_dir, alias_name) + try: + if os.path.exists(alias_link_path) or os.path.islink(alias_link_path): + os.remove(alias_link_path) + os.symlink(alias_path, alias_link_path) + if not quiet: + print(f"Created alias '{alias_name}' pointing to {repo_identifier}") + except Exception as e: + if not quiet: + print(f"Error creating alias '{alias_name}': {e}") \ No newline at end of file diff --git a/pkgmgr/deinstall_repos.py b/pkgmgr/deinstall_repos.py new file mode 100644 index 0000000..78241fd --- /dev/null +++ b/pkgmgr/deinstall_repos.py @@ -0,0 +1,18 @@ +import os + +def deinstall_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, preview=False): + for repo in selected_repos: + repo_identifier = get_repo_identifier(repo, all_repos) + alias_path = os.path.join(bin_dir, repo_identifier) + if os.path.exists(alias_path): + if preview: + print(f"[Preview] Would remove executable '{alias_path}'.") + else: + os.remove(alias_path) + print(f"Removed executable for {repo_identifier}.") + else: + print(f"No executable found for {repo_identifier} in {bin_dir}.") + teardown_cmd = repo.get("teardown") + repo_dir = get_repo_dir(repositories_base_dir,repo) + if teardown_cmd and os.path.exists(repo_dir): + run_command(teardown_cmd, cwd=repo_dir, preview=preview) \ No newline at end of file diff --git a/pkgmgr/delete_repos.py b/pkgmgr/delete_repos.py new file mode 100644 index 0000000..1fe5435 --- /dev/null +++ b/pkgmgr/delete_repos.py @@ -0,0 +1,22 @@ +import shutil +import os + +def delete_repos(selected_repos, repositories_base_dir, all_repos, preview=False): + for repo in selected_repos: + repo_identifier = get_repo_identifier(repo, all_repos) + repo_dir = get_repo_dir(repositories_base_dir, repo) + if os.path.exists(repo_dir): + confirm = input(f"Are you sure you want to delete directory '{repo_dir}' for {repo_identifier}? [y/N]: ").strip().lower() + if confirm == "y": + if preview: + print(f"[Preview] Would delete directory '{repo_dir}' for {repo_identifier}.") + else: + try: + shutil.rmtree(repo_dir) + print(f"Deleted repository directory '{repo_dir}' for {repo_identifier}.") + except Exception as e: + print(f"Error deleting '{repo_dir}' for {repo_identifier}: {e}") + else: + print(f"Skipped deletion of '{repo_dir}' for {repo_identifier}.") + else: + print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.") \ No newline at end of file diff --git a/pkgmgr/exec_git_command.py b/pkgmgr/exec_git_command.py new file mode 100644 index 0000000..53ad7bd --- /dev/null +++ b/pkgmgr/exec_git_command.py @@ -0,0 +1,12 @@ +import os + +def exec_git_command(selected_repos, repositories_base_dir, all_repos, git_cmd, extra_args, preview=False): + """Execute a given git command with extra arguments for each repository.""" + for repo in selected_repos: + repo_identifier = get_repo_identifier(repo, all_repos) + repo_dir = get_repo_dir(repositories_base_dir,repo) + if os.path.exists(repo_dir): + full_cmd = f"git {git_cmd} {' '.join(extra_args)}" + run_command(full_cmd, cwd=repo_dir, preview=preview) + else: + print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.") \ No newline at end of file diff --git a/pkgmgr/filter_ignored.py b/pkgmgr/filter_ignored.py new file mode 100644 index 0000000..75f911b --- /dev/null +++ b/pkgmgr/filter_ignored.py @@ -0,0 +1,3 @@ +def filter_ignored(repos): + """Filter out repositories that have 'ignore' set to True.""" + return [r for r in repos if not r.get("ignore", False)] \ No newline at end of file diff --git a/pkgmgr/generate_alias.py b/pkgmgr/generate_alias.py new file mode 100644 index 0000000..3c0996a --- /dev/null +++ b/pkgmgr/generate_alias.py @@ -0,0 +1,42 @@ +import os +import hashlib +import re + +def generate_alias(repo, bin_dir, existing_aliases): + """ + Generate an alias for a repository based on its repository name. + + Steps: + 1. Keep only consonants from the repository name (letters from BCDFGHJKLMNPQRSTVWXYZ). + 2. Collapse consecutive identical consonants. + 3. Truncate to at most 12 characters. + 4. If that alias conflicts (already in existing_aliases or a file exists in bin_dir), + then prefix with the first letter of provider and account. + 5. If still conflicting, append a three-character hash until the alias is unique. + """ + repo_name = repo.get("repository") + # Keep only consonants. + consonants = re.sub(r"[^bcdfghjklmnpqrstvwxyzBCDFGHJKLMNPQRSTVWXYZ]", "", repo_name) + # Collapse consecutive identical consonants. + collapsed = re.sub(r"(.)\1+", r"\1", consonants) + base_alias = collapsed[:12] if len(collapsed) > 12 else collapsed + candidate = base_alias.lower() + + def conflict(alias): + alias_path = os.path.join(bin_dir, alias) + return alias in existing_aliases or os.path.exists(alias_path) + + if not conflict(candidate): + return candidate + + prefix = (repo.get("provider", "")[0] + repo.get("account", "")[0]).lower() + candidate2 = (prefix + candidate)[:12] + if not conflict(candidate2): + return candidate2 + + h = hashlib.md5(repo_name.encode("utf-8")).hexdigest()[:3] + candidate3 = (candidate2 + h)[:12] + while conflict(candidate3): + candidate3 += "x" + candidate3 = candidate3[:12] + return candidate3 \ No newline at end of file diff --git a/pkgmgr/get_repo_dir.py b/pkgmgr/get_repo_dir.py new file mode 100644 index 0000000..e154aea --- /dev/null +++ b/pkgmgr/get_repo_dir.py @@ -0,0 +1,15 @@ +import sys +import os + +def get_repo_dir(repositories_base_dir:str,repo:{})->str: + try: + return os.path.join(repositories_base_dir, repo.get("provider"), repo.get("account"), repo.get("repository")) + except TypeError as e: + if repositories_base_dir: + print(f"Error: {e} \nThe repository {repo} seems not correct configured.\nPlease configure it correct.") + for key in ["provider","account","repository"]: + if not repo.get(key,False): + print(f"Key '{key}' is missing.") + else: + print(f"Error: {e} \nThe base {base} seems not correct configured.\nPlease configure it correct.") + sys.exit(1) \ No newline at end of file diff --git a/pkgmgr/get_repo_identifier.py b/pkgmgr/get_repo_identifier.py new file mode 100644 index 0000000..e18a5e8 --- /dev/null +++ b/pkgmgr/get_repo_identifier.py @@ -0,0 +1,12 @@ +def get_repo_identifier(repo, all_repos): + """ + Return a unique identifier for the repository. + If the repository name is unique among all_repos, return repository name; + otherwise, return 'provider/account/repository'. + """ + repo_name = repo.get("repository") + count = sum(1 for r in all_repos if r.get("repository") == repo_name) + if count == 1: + return repo_name + else: + return f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}' \ No newline at end of file diff --git a/pkgmgr/get_selected_repos.py b/pkgmgr/get_selected_repos.py new file mode 100644 index 0000000..cff8e7d --- /dev/null +++ b/pkgmgr/get_selected_repos.py @@ -0,0 +1,14 @@ +import sys +from .resolve_repos import resolve_repos +from .filter_ignored import filter_ignored + +def get_selected_repos(show_all:bool,all_repos_list,identifiers=None): + if show_all: + selected = all_repos_list + else: + selected = resolve_repos(identifiers, all_repos_list) + filtered = filter_ignored(selected) + if not selected: + print("Error: No repositories had been selected.") + sys.exit(1) + return filtered \ No newline at end of file diff --git a/pkgmgr/install_repos.py b/pkgmgr/install_repos.py new file mode 100644 index 0000000..c345742 --- /dev/null +++ b/pkgmgr/install_repos.py @@ -0,0 +1,14 @@ +import os + +def install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, preview=False, quiet=False): + """Install repositories by creating executable wrappers and running setup.""" + for repo in selected_repos: + repo_identifier = get_repo_identifier(repo, all_repos) + repo_dir = get_repo_dir(repositories_base_dir,repo) + if not os.path.exists(repo_dir): + print(f"Repository directory '{repo_dir}' does not exist. Clone it first.") + continue + create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=quiet, preview=preview, no_verification=no_verification) + setup_cmd = repo.get("setup") + if setup_cmd: + run_command(setup_cmd, cwd=repo_dir, preview=preview) \ No newline at end of file diff --git a/pkgmgr/interactive_add.py b/pkgmgr/interactive_add.py new file mode 100644 index 0000000..ddd8159 --- /dev/null +++ b/pkgmgr/interactive_add.py @@ -0,0 +1,38 @@ +import yaml +import os + +def interactive_add(config): + """Interactively prompt the user to add a new repository entry to the user config.""" + print("Adding a new repository configuration entry.") + new_entry = {} + new_entry["provider"] = input("Provider (e.g., github.com): ").strip() + new_entry["account"] = input("Account (e.g., yourusername): ").strip() + new_entry["repository"] = input("Repository name (e.g., mytool): ").strip() + new_entry["verified"] = input("Verified commit id: ").strip() + new_entry["command"] = input("Command (optional, leave blank to auto-detect): ").strip() + new_entry["description"] = input("Description (optional): ").strip() + new_entry["replacement"] = input("Replacement (optional): ").strip() + new_entry["setup"] = input("Setup command (optional): ").strip() + new_entry["teardown"] = input("Teardown command (optional): ").strip() + new_entry["alias"] = input("Alias (optional): ").strip() + # Allow the user to mark this entry as ignored. + ignore_val = input("Ignore this entry? (y/N): ").strip().lower() + if ignore_val == "y": + new_entry["ignore"] = True + + print("\nNew entry:") + for key, value in new_entry.items(): + if value: + print(f"{key}: {value}") + confirm = input("Add this entry to user config? (y/N): ").strip().lower() + if confirm == "y": + if os.path.exists(USER_CONFIG_PATH): + with open(USER_CONFIG_PATH, 'r') as f: + user_config = yaml.safe_load(f) or {} + else: + user_config = {"repositories": []} + user_config.setdefault("repositories", []) + user_config["repositories"].append(new_entry) + save_user_config(user_config) + else: + print("Entry not added.") \ No newline at end of file diff --git a/pkgmgr/list_repositories.py b/pkgmgr/list_repositories.py new file mode 100644 index 0000000..3b6f269 --- /dev/null +++ b/pkgmgr/list_repositories.py @@ -0,0 +1,102 @@ +import os + +def list_repositories(all_repos, repositories_base_dir, bin_dir, search_filter="", status_filter=""): + """ + List all repositories with their attributes and status information. + + Parameters: + all_repos (list): List of repository configurations. + repositories_base_dir (str): The base directory where repositories are located. + bin_dir (str): The directory where executable wrappers are stored. + search_filter (str): Filter for repository attributes (case insensitive). + status_filter (str): Filter for computed status info (case insensitive). + + For each repository, the identifier is printed in bold, the description (if available) + in italic, then all other attributes and computed status are printed. + If the repository is installed, a hint is displayed under the attributes. + Repositories are filtered out if either the search_filter is not found in any attribute or + if the status_filter is not found in the computed status string. + """ + search_filter = search_filter.lower() if search_filter else "" + status_filter = status_filter.lower() if status_filter else "" + + # Define status colors using colors not used for other attributes: + # Avoid red (for ignore), blue (for homepage) and yellow (for verified). + status_colors = { + "Installed": "\033[1;32m", # Green + "Not Installed": "\033[1;35m", # Magenta + "Cloned": "\033[1;36m", # Cyan + "Clonable": "\033[1;37m", # White + "Ignored": "\033[38;5;208m", # Orange (extended) + "Active": "\033[38;5;129m", # Light Purple (extended) + "Installable": "\033[38;5;82m" # Light Green (extended) + } + + for repo in all_repos: + # Combine all attribute values into one string for filtering. + repo_text = " ".join(str(v) for v in repo.values()).lower() + if search_filter and search_filter not in repo_text: + continue + + # Compute status information for the repository. + identifier = get_repo_identifier(repo, all_repos) + executable_path = os.path.join(bin_dir, identifier) + repo_dir = get_repo_dir(repositories_base_dir, repo) + status_list = [] + + # Check if the executable exists (Installed). + if os.path.exists(executable_path): + status_list.append("Installed") + else: + status_list.append("Not Installed") + # Check if the repository directory exists (Cloned). + if os.path.exists(repo_dir): + status_list.append("Cloned") + else: + status_list.append("Clonable") + # Mark ignored repositories. + if repo.get("ignore", False): + status_list.append("Ignored") + else: + status_list.append("Active") + # Define installable as cloned but not installed. + if os.path.exists(repo_dir) and not os.path.exists(executable_path): + status_list.append("Installable") + + # Build a colored status string. + colored_statuses = [f"{status_colors.get(s, '')}{s}\033[0m" for s in status_list] + status_str = ", ".join(colored_statuses) + + # If a status_filter is provided, only display repos whose status contains the filter. + if status_filter and status_filter not in status_str.lower(): + continue + + # Display repository details: + # Print the identifier in bold. + print(f"\033[1m{identifier}\033[0m") + # Print the description in italic if it exists. + description = repo.get("description") + if description: + print(f"\n\033[3m{description}\033[0m") + print("\nAttributes:") + # Loop through all attributes. + for key, value in repo.items(): + formatted_value = str(value) + # Special formatting for "verified" attribute (yellow). + if key == "verified" and value: + formatted_value = f"\033[1;33m{value}\033[0m" + # Special formatting for "ignore" flag (red if True). + if key == "ignore" and value: + formatted_value = f"\033[1;31m{value}\033[0m" + if key == "description": + continue + # Highlight homepage in blue. + if key.lower() == "homepage" and value: + formatted_value = f"\033[1;34m{value}\033[0m" + print(f" {key}: {formatted_value}") + # Always display the computed status. + print(f" Status: {status_str}") + # If the repository is installed, display a hint for more info. + if os.path.exists(executable_path): + print(f"\nMore information and help: \033[1;4mpkgmgr {identifier} --help\033[0m\n") + print("-" * 40) \ No newline at end of file diff --git a/pkgmgr/load_config.py b/pkgmgr/load_config.py new file mode 100644 index 0000000..84595b7 --- /dev/null +++ b/pkgmgr/load_config.py @@ -0,0 +1,25 @@ +import sys +import yaml +import os + +DEFAULT_CONFIG_PATH = os.path.join("config", "defaults.yaml") + +def load_config(user_config_path): + """Load configuration from defaults and merge in user config if present.""" + if not os.path.exists(DEFAULT_CONFIG_PATH): + print(f"Default configuration file '{DEFAULT_CONFIG_PATH}' not found.") + sys.exit(1) + with open(DEFAULT_CONFIG_PATH, 'r') as f: + config = yaml.safe_load(f) + if "directories" not in config or "repositories" not in config: + print("Default config file must contain 'directories' and 'repositories' keys.") + sys.exit(1) + if os.path.exists(user_config_path): + with open(user_config_path, 'r') as f: + user_config = yaml.safe_load(f) + if user_config: + if "directories" in user_config: + config["directories"] = user_config["directories"] + if "repositories" in user_config: + config["repositories"].extend(user_config["repositories"]) + return config \ No newline at end of file diff --git a/pkgmgr/resolve_repos.py b/pkgmgr/resolve_repos.py new file mode 100644 index 0000000..177ae89 --- /dev/null +++ b/pkgmgr/resolve_repos.py @@ -0,0 +1,28 @@ +import os + +def resolve_repos(identifiers, all_repos): + """ + Given a list of identifier strings, return a list of repository configs. + The identifier can be: + - the full identifier "provider/account/repository" + - the repository name (if unique among all_repos) + - the alias (if defined) + """ + selected = [] + for ident in identifiers: + matches = [] + for repo in all_repos: + full_id = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}' + if ident == full_id: + matches.append(repo) + elif ident == repo.get("alias"): + matches.append(repo) + elif ident == repo.get("repository"): + # Only match if repository name is unique among all_repos. + if sum(1 for r in all_repos if r.get("repository") == ident) == 1: + matches.append(repo) + if not matches: + print(f"Identifier '{ident}' did not match any repository in config.") + else: + selected.extend(matches) + return selected \ No newline at end of file diff --git a/pkgmgr/run_command.py b/pkgmgr/run_command.py new file mode 100644 index 0000000..180fc8e --- /dev/null +++ b/pkgmgr/run_command.py @@ -0,0 +1,18 @@ +import sys +import subprocess +import os + +def run_command(command, cwd=None, preview=False): + """Run a shell command in a given directory, or print it in preview mode. + + If the command fails, exit the program with the command's exit code. + """ + current_dir = cwd or os.getcwd() + if preview: + print(f"[Preview] In '{current_dir}': {command}") + else: + print(f"Running in '{current_dir}': {command}") + result = subprocess.run(command, cwd=cwd, shell=True, check=False) + if result.returncode != 0: + print(f"Command failed with exit code {result.returncode}. Exiting.") + sys.exit(result.returncode) \ No newline at end of file diff --git a/pkgmgr/save_user_config.py b/pkgmgr/save_user_config.py new file mode 100644 index 0000000..49657d4 --- /dev/null +++ b/pkgmgr/save_user_config.py @@ -0,0 +1,9 @@ +import yaml +import os + +def save_user_config(user_config): + """Save the user configuration to USER_CONFIG_PATH.""" + os.makedirs(os.path.dirname(USER_CONFIG_PATH), exist_ok=True) + with open(USER_CONFIG_PATH, 'w') as f: + yaml.dump(user_config, f) + print(f"User configuration updated in {USER_CONFIG_PATH}.") \ No newline at end of file diff --git a/pkgmgr/show_config.py b/pkgmgr/show_config.py new file mode 100644 index 0000000..7e7d77f --- /dev/null +++ b/pkgmgr/show_config.py @@ -0,0 +1,14 @@ +import yaml + +def show_config(selected_repos, user_config_path, full_config=False): + """Display configuration for one or more repositories, or the entire merged config.""" + if full_config: + merged = load_config(user_config_path) + print(yaml.dump(merged, default_flow_style=False)) + else: + for repo in selected_repos: + identifier = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}' + print(f"Repository: {identifier}") + for key, value in repo.items(): + print(f" {key}: {value}") + print("-" * 40) \ No newline at end of file diff --git a/pkgmgr/status_repos.py b/pkgmgr/status_repos.py new file mode 100644 index 0000000..6642313 --- /dev/null +++ b/pkgmgr/status_repos.py @@ -0,0 +1,11 @@ +import sys + +def status_repos(selected_repos, repositories_base_dir, all_repos, extra_args, list_only=False, system_status=False, preview=False): + if system_status: + print("System status:") + run_command("yay -Qu", preview=preview) + if list_only: + for repo in selected_repos: + print(get_repo_identifier(repo, all_repos)) + else: + exec_git_command(selected_repos, repositories_base_dir, all_repos, "status", extra_args, preview) \ No newline at end of file diff --git a/pkgmgr/update_repos.py b/pkgmgr/update_repos.py new file mode 100644 index 0000000..f3f589f --- /dev/null +++ b/pkgmgr/update_repos.py @@ -0,0 +1,8 @@ +import sys + +def update_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, system_update=False, preview=False, quiet=False): + git_default_exec(selected_repos, repositories_base_dir, all_repos, extra_args=[],command="pull", preview=preview) + install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, no_verification, preview=preview, quiet=quiet) + if system_update: + run_command("yay -Syu", preview=preview) + run_command("sudo pacman -Syyu", preview=preview) \ No newline at end of file