Refactored main.py into function files

This commit is contained in:
Kevin Veen-Birkenbach
2025-03-06 11:10:11 +01:00
parent 9f95d34b9c
commit fb9ea2e24b
23 changed files with 619 additions and 577 deletions

605
main.py
View File

@@ -1,18 +1,34 @@
import re
import hashlib
import os
import subprocess
import shutil
import sys
import yaml
import argparse
import json
# Define configuration file paths.
DEFAULT_CONFIG_PATH = os.path.join("config", "defaults.yaml")
USER_CONFIG_PATH = os.path.join("config", "config.yaml")
BIN_DIR = os.path.expanduser("~/.local/bin")
from pkgmgr.clone_repos import clone_repos
from pkgmgr.config_init import config_init
from pkgmgr.create_executable import create_executable
from pkgmgr.deinstall_repos import deinstall_repos
from pkgmgr.delete_repos import delete_repos
from pkgmgr.exec_git_command import exec_git_command
from pkgmgr.filter_ignored import filter_ignored
from pkgmgr.generate_alias import generate_alias
from pkgmgr.get_repo_dir import get_repo_dir
from pkgmgr.get_repo_identifier import get_repo_identifier
from pkgmgr.get_selected_repos import get_selected_repos
from pkgmgr.install_repos import install_repos
from pkgmgr.interactive_add import interactive_add
from pkgmgr.list_repositories import list_repositories
from pkgmgr.load_config import load_config
from pkgmgr.resolve_repos import resolve_repos
from pkgmgr.run_command import run_command
from pkgmgr.save_user_config import save_user_config
from pkgmgr.show_config import show_config
from pkgmgr.status_repos import status_repos
from pkgmgr.update_repos import update_repos
# Commands proxied by package-manager
GIT_DEFAULT_COMMANDS = [
"pull",
@@ -27,575 +43,9 @@ GIT_DEFAULT_COMMANDS = [
"commit"
]
def load_config():
"""Load configuration from defaults and merge in user config if present."""
if not os.path.exists(DEFAULT_CONFIG_PATH):
print(f"Default configuration file '{DEFAULT_CONFIG_PATH}' not found.")
sys.exit(1)
with open(DEFAULT_CONFIG_PATH, 'r') as f:
config = yaml.safe_load(f)
if "directories" not in config or "repositories" not in config:
print("Default config file must contain 'directories' and 'repositories' keys.")
sys.exit(1)
if os.path.exists(USER_CONFIG_PATH):
with open(USER_CONFIG_PATH, 'r') as f:
user_config = yaml.safe_load(f)
if user_config:
if "directories" in user_config:
config["directories"] = user_config["directories"]
if "repositories" in user_config:
config["repositories"].extend(user_config["repositories"])
return config
def save_user_config(user_config):
"""Save the user configuration to USER_CONFIG_PATH."""
os.makedirs(os.path.dirname(USER_CONFIG_PATH), exist_ok=True)
with open(USER_CONFIG_PATH, 'w') as f:
yaml.dump(user_config, f)
print(f"User configuration updated in {USER_CONFIG_PATH}.")
def run_command(command, cwd=None, preview=False):
"""Run a shell command in a given directory, or print it in preview mode.
If the command fails, exit the program with the command's exit code.
"""
current_dir = cwd or os.getcwd()
if preview:
print(f"[Preview] In '{current_dir}': {command}")
else:
print(f"Running in '{current_dir}': {command}")
result = subprocess.run(command, cwd=cwd, shell=True, check=False)
if result.returncode != 0:
print(f"Command failed with exit code {result.returncode}. Exiting.")
sys.exit(result.returncode)
def get_repo_identifier(repo, all_repos):
"""
Return a unique identifier for the repository.
If the repository name is unique among all_repos, return repository name;
otherwise, return 'provider/account/repository'.
"""
repo_name = repo.get("repository")
count = sum(1 for r in all_repos if r.get("repository") == repo_name)
if count == 1:
return repo_name
else:
return f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
def resolve_repos(identifiers, all_repos):
"""
Given a list of identifier strings, return a list of repository configs.
The identifier can be:
- the full identifier "provider/account/repository"
- the repository name (if unique among all_repos)
- the alias (if defined)
"""
selected = []
for ident in identifiers:
matches = []
for repo in all_repos:
full_id = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
if ident == full_id:
matches.append(repo)
elif ident == repo.get("alias"):
matches.append(repo)
elif ident == repo.get("repository"):
# Only match if repository name is unique among all_repos.
if sum(1 for r in all_repos if r.get("repository") == ident) == 1:
matches.append(repo)
if not matches:
print(f"Identifier '{ident}' did not match any repository in config.")
else:
selected.extend(matches)
return selected
def filter_ignored(repos):
"""Filter out repositories that have 'ignore' set to True."""
return [r for r in repos if not r.get("ignore", False)]
def generate_alias(repo, bin_dir, existing_aliases):
"""
Generate an alias for a repository based on its repository name.
Steps:
1. Keep only consonants from the repository name (letters from BCDFGHJKLMNPQRSTVWXYZ).
2. Collapse consecutive identical consonants.
3. Truncate to at most 12 characters.
4. If that alias conflicts (already in existing_aliases or a file exists in bin_dir),
then prefix with the first letter of provider and account.
5. If still conflicting, append a three-character hash until the alias is unique.
"""
repo_name = repo.get("repository")
# Keep only consonants.
consonants = re.sub(r"[^bcdfghjklmnpqrstvwxyzBCDFGHJKLMNPQRSTVWXYZ]", "", repo_name)
# Collapse consecutive identical consonants.
collapsed = re.sub(r"(.)\1+", r"\1", consonants)
base_alias = collapsed[:12] if len(collapsed) > 12 else collapsed
candidate = base_alias.lower()
def conflict(alias):
alias_path = os.path.join(bin_dir, alias)
return alias in existing_aliases or os.path.exists(alias_path)
if not conflict(candidate):
return candidate
prefix = (repo.get("provider", "")[0] + repo.get("account", "")[0]).lower()
candidate2 = (prefix + candidate)[:12]
if not conflict(candidate2):
return candidate2
h = hashlib.md5(repo_name.encode("utf-8")).hexdigest()[:3]
candidate3 = (candidate2 + h)[:12]
while conflict(candidate3):
candidate3 += "x"
candidate3 = candidate3[:12]
return candidate3
def create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=False, preview=False, no_verification=False):
"""
Create an executable bash wrapper for the repository.
If 'verified' is set, the wrapper will checkout that commit and warn (unless quiet is True).
If no verified commit is set, a warning is printed unless quiet is True.
If an 'alias' field is provided, a symlink is created in bin_dir with that alias.
"""
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir,repo)
command = repo.get("command")
if not command:
main_sh = os.path.join(repo_dir, "main.sh")
main_py = os.path.join(repo_dir, "main.py")
if os.path.exists(main_sh):
command = "bash main.sh"
elif os.path.exists(main_py):
command = "python3 main.py"
else:
if not quiet:
print(f"No command defined and no main.sh/main.py found in {repo_dir}. Skipping alias creation.")
return
ORANGE = r"\033[38;5;208m"
RESET = r"\033[0m"
if no_verification:
preamble = ""
else:
if verified := repo.get("verified"):
if not quiet:
preamble = f"""\
git checkout {verified} || echo -e "{ORANGE}Warning: Failed to checkout commit {verified}.{RESET}"
CURRENT=$(git rev-parse HEAD)
if [ "$CURRENT" != "{verified}" ]; then
echo -e "{ORANGE}Warning: Current commit ($CURRENT) does not match verified commit ({verified}).{RESET}"
fi
"""
else:
preamble = ""
else:
preamble = "" if quiet else f'echo -e "{ORANGE}Warning: No verified commit set for this repository.{RESET}"'
script_content = f"""#!/bin/bash
cd "{repo_dir}"
{preamble}
{command} "$@"
"""
alias_path = os.path.join(bin_dir, repo_identifier)
if preview:
print(f"[Preview] Would create executable '{alias_path}' with content:\n{script_content}")
else:
os.makedirs(bin_dir, exist_ok=True)
with open(alias_path, "w") as f:
f.write(script_content)
os.chmod(alias_path, 0o755)
if not quiet:
print(f"Installed executable for {repo_identifier} at {alias_path}")
alias_name = repo.get("alias")
if alias_name:
alias_link_path = os.path.join(bin_dir, alias_name)
try:
if os.path.exists(alias_link_path) or os.path.islink(alias_link_path):
os.remove(alias_link_path)
os.symlink(alias_path, alias_link_path)
if not quiet:
print(f"Created alias '{alias_name}' pointing to {repo_identifier}")
except Exception as e:
if not quiet:
print(f"Error creating alias '{alias_name}': {e}")
def install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, preview=False, quiet=False):
"""Install repositories by creating executable wrappers and running setup."""
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir,repo)
if not os.path.exists(repo_dir):
print(f"Repository directory '{repo_dir}' does not exist. Clone it first.")
continue
create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=quiet, preview=preview, no_verification=no_verification)
setup_cmd = repo.get("setup")
if setup_cmd:
run_command(setup_cmd, cwd=repo_dir, preview=preview)
def exec_git_command(selected_repos, repositories_base_dir, all_repos, git_cmd, extra_args, preview=False):
"""Execute a given git command with extra arguments for each repository."""
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir,repo)
if os.path.exists(repo_dir):
full_cmd = f"git {git_cmd} {' '.join(extra_args)}"
run_command(full_cmd, cwd=repo_dir, preview=preview)
else:
print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.")
def status_repos(selected_repos, repositories_base_dir, all_repos, extra_args, list_only=False, system_status=False, preview=False):
if system_status:
print("System status:")
run_command("yay -Qu", preview=preview)
if list_only:
for repo in selected_repos:
print(get_repo_identifier(repo, all_repos))
else:
exec_git_command(selected_repos, repositories_base_dir, all_repos, "status", extra_args, preview)
def get_repo_dir(repositories_base_dir:str,repo:{})->str:
try:
return os.path.join(repositories_base_dir, repo.get("provider"), repo.get("account"), repo.get("repository"))
except TypeError as e:
if repositories_base_dir:
print(f"Error: {e} \nThe repository {repo} seems not correct configured.\nPlease configure it correct.")
for key in ["provider","account","repository"]:
if not repo.get(key,False):
print(f"Key '{key}' is missing.")
else:
print(f"Error: {e} \nThe base {base} seems not correct configured.\nPlease configure it correct.")
sys.exit(1)
def clone_repos(selected_repos, repositories_base_dir: str, all_repos, preview=False):
import subprocess # ensure subprocess is imported
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir, repo)
if os.path.exists(repo_dir):
print(f"[INFO] Repository '{repo_identifier}' already exists at '{repo_dir}'. Skipping clone.")
continue
parent_dir = os.path.dirname(repo_dir)
os.makedirs(parent_dir, exist_ok=True)
# Attempt SSH clone first.
target = repo.get("replacement") if repo.get("replacement") else f"{repo.get('provider')}:{repo.get('account')}/{repo.get('repository')}"
clone_url = f"git@{target}.git"
print(f"[INFO] Attempting to clone '{repo_identifier}' using SSH from {clone_url} into '{repo_dir}'.")
if preview:
print(f"[Preview] Would run: git clone {clone_url} {repo_dir} in {parent_dir}")
# Simulate a successful clone in preview mode.
result = subprocess.CompletedProcess(args=[], returncode=0)
else:
result = subprocess.run(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, shell=True)
# If SSH clone returns an error code, ask user whether to try HTTPS.
if result.returncode != 0:
print(f"[WARNING] SSH clone failed for '{repo_identifier}' with return code {result.returncode}.")
choice = input("Do you want to attempt HTTPS clone instead? (y/N): ").strip().lower()
if choice == 'y':
target = repo.get("replacement") if repo.get("replacement") else f"{repo.get('provider')}/{repo.get('account')}/{repo.get('repository')}"
clone_url = f"https://{target}.git"
print(f"[INFO] Attempting to clone '{repo_identifier}' using HTTPS from {clone_url} into '{repo_dir}'.")
if preview:
print(f"[Preview] Would run: git clone {clone_url} {repo_dir} in {parent_dir}")
else:
subprocess.run(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, shell=True)
else:
print(f"[INFO] HTTPS clone not attempted for '{repo_identifier}'.")
def deinstall_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, preview=False):
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
alias_path = os.path.join(bin_dir, repo_identifier)
if os.path.exists(alias_path):
if preview:
print(f"[Preview] Would remove executable '{alias_path}'.")
else:
os.remove(alias_path)
print(f"Removed executable for {repo_identifier}.")
else:
print(f"No executable found for {repo_identifier} in {bin_dir}.")
teardown_cmd = repo.get("teardown")
repo_dir = get_repo_dir(repositories_base_dir,repo)
if teardown_cmd and os.path.exists(repo_dir):
run_command(teardown_cmd, cwd=repo_dir, preview=preview)
def delete_repos(selected_repos, repositories_base_dir, all_repos, preview=False):
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir, repo)
if os.path.exists(repo_dir):
confirm = input(f"Are you sure you want to delete directory '{repo_dir}' for {repo_identifier}? [y/N]: ").strip().lower()
if confirm == "y":
if preview:
print(f"[Preview] Would delete directory '{repo_dir}' for {repo_identifier}.")
else:
try:
shutil.rmtree(repo_dir)
print(f"Deleted repository directory '{repo_dir}' for {repo_identifier}.")
except Exception as e:
print(f"Error deleting '{repo_dir}' for {repo_identifier}: {e}")
else:
print(f"Skipped deletion of '{repo_dir}' for {repo_identifier}.")
else:
print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.")
def update_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, system_update=False, preview=False, quiet=False):
git_default_exec(selected_repos, repositories_base_dir, all_repos, extra_args=[],command="pull", preview=preview)
install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, no_verification, preview=preview, quiet=quiet)
if system_update:
run_command("yay -Syu", preview=preview)
run_command("sudo pacman -Syyu", preview=preview)
def git_default_exec(selected_repos, repositories_base_dir, all_repos, extra_args, command:str, preview=False):
exec_git_command(selected_repos, repositories_base_dir, all_repos, command, extra_args, preview)
def show_config(selected_repos, full_config=False):
"""Display configuration for one or more repositories, or the entire merged config."""
if full_config:
merged = load_config()
print(yaml.dump(merged, default_flow_style=False))
else:
for repo in selected_repos:
identifier = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
print(f"Repository: {identifier}")
for key, value in repo.items():
print(f" {key}: {value}")
print("-" * 40)
def interactive_add(config):
"""Interactively prompt the user to add a new repository entry to the user config."""
print("Adding a new repository configuration entry.")
new_entry = {}
new_entry["provider"] = input("Provider (e.g., github.com): ").strip()
new_entry["account"] = input("Account (e.g., yourusername): ").strip()
new_entry["repository"] = input("Repository name (e.g., mytool): ").strip()
new_entry["verified"] = input("Verified commit id: ").strip()
new_entry["command"] = input("Command (optional, leave blank to auto-detect): ").strip()
new_entry["description"] = input("Description (optional): ").strip()
new_entry["replacement"] = input("Replacement (optional): ").strip()
new_entry["setup"] = input("Setup command (optional): ").strip()
new_entry["teardown"] = input("Teardown command (optional): ").strip()
new_entry["alias"] = input("Alias (optional): ").strip()
# Allow the user to mark this entry as ignored.
ignore_val = input("Ignore this entry? (y/N): ").strip().lower()
if ignore_val == "y":
new_entry["ignore"] = True
print("\nNew entry:")
for key, value in new_entry.items():
if value:
print(f"{key}: {value}")
confirm = input("Add this entry to user config? (y/N): ").strip().lower()
if confirm == "y":
if os.path.exists(USER_CONFIG_PATH):
with open(USER_CONFIG_PATH, 'r') as f:
user_config = yaml.safe_load(f) or {}
else:
user_config = {"repositories": []}
user_config.setdefault("repositories", [])
user_config["repositories"].append(new_entry)
save_user_config(user_config)
else:
print("Entry not added.")
def edit_config():
"""Open the user configuration file in nano."""
run_command(f"nano {USER_CONFIG_PATH}")
def config_init(user_config, defaults_config, bin_dir):
"""
Scan the base directory (defaults_config["base"]) for repositories.
The folder structure is assumed to be:
{base}/{provider}/{account}/{repository}
For each repository found, automatically determine:
- provider, account, repository from folder names.
- verified: the latest commit (via 'git log -1 --format=%H').
- alias: generated from the repository name using generate_alias().
Repositories already defined in defaults_config["repositories"] or user_config["repositories"] are skipped.
"""
repositories_base_dir = os.path.expanduser(defaults_config["directories"]["repositories"])
if not os.path.isdir(repositories_base_dir):
print(f"Base directory '{repositories_base_dir}' does not exist.")
return
default_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in defaults_config.get("repositories", [])}
existing_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in user_config.get("repositories", [])}
existing_aliases = {entry.get("alias") for entry in user_config.get("repositories", []) if entry.get("alias")}
new_entries = []
for provider in os.listdir(repositories_base_dir):
provider_path = os.path.join(repositories_base_dir, provider)
if not os.path.isdir(provider_path):
continue
for account in os.listdir(provider_path):
account_path = os.path.join(provider_path, account)
if not os.path.isdir(account_path):
continue
for repo_name in os.listdir(account_path):
repo_path = os.path.join(account_path, repo_name)
if not os.path.isdir(repo_path):
continue
key = (provider, account, repo_name)
if key in default_keys or key in existing_keys:
continue
try:
result = subprocess.run(
["git", "log", "-1", "--format=%H"],
cwd=repo_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
)
verified = result.stdout.strip()
except Exception as e:
verified = ""
print(f"Could not determine latest commit for {repo_name} ({provider}/{account}): {e}")
entry = {
"provider": provider,
"account": account,
"repository": repo_name,
"verified": verified,
"ignore": True
}
alias = generate_alias({"repository": repo_name, "provider": provider, "account": account}, bin_dir, existing_aliases)
entry["alias"] = alias
existing_aliases.add(alias)
new_entries.append(entry)
print(f"Adding new repo entry: {entry}")
if new_entries:
user_config.setdefault("repositories", []).extend(new_entries)
save_user_config(user_config)
else:
print("No new repositories found.")
def get_selected_repos(show_all:bool,all_repos_list,identifiers=None):
if show_all:
selected = all_repos_list
else:
selected = resolve_repos(identifiers, all_repos_list)
filtered = filter_ignored(selected)
if not selected:
print("Error: No repositories had been selected.")
sys.exit(1)
return filtered
def list_repositories(all_repos, repositories_base_dir, bin_dir, search_filter="", status_filter=""):
"""
List all repositories with their attributes and status information.
Parameters:
all_repos (list): List of repository configurations.
repositories_base_dir (str): The base directory where repositories are located.
bin_dir (str): The directory where executable wrappers are stored.
search_filter (str): Filter for repository attributes (case insensitive).
status_filter (str): Filter for computed status info (case insensitive).
For each repository, the identifier is printed in bold, the description (if available)
in italic, then all other attributes and computed status are printed.
If the repository is installed, a hint is displayed under the attributes.
Repositories are filtered out if either the search_filter is not found in any attribute or
if the status_filter is not found in the computed status string.
"""
search_filter = search_filter.lower() if search_filter else ""
status_filter = status_filter.lower() if status_filter else ""
# Define status colors using colors not used for other attributes:
# Avoid red (for ignore), blue (for homepage) and yellow (for verified).
status_colors = {
"Installed": "\033[1;32m", # Green
"Not Installed": "\033[1;35m", # Magenta
"Cloned": "\033[1;36m", # Cyan
"Clonable": "\033[1;37m", # White
"Ignored": "\033[38;5;208m", # Orange (extended)
"Active": "\033[38;5;129m", # Light Purple (extended)
"Installable": "\033[38;5;82m" # Light Green (extended)
}
for repo in all_repos:
# Combine all attribute values into one string for filtering.
repo_text = " ".join(str(v) for v in repo.values()).lower()
if search_filter and search_filter not in repo_text:
continue
# Compute status information for the repository.
identifier = get_repo_identifier(repo, all_repos)
executable_path = os.path.join(bin_dir, identifier)
repo_dir = get_repo_dir(repositories_base_dir, repo)
status_list = []
# Check if the executable exists (Installed).
if os.path.exists(executable_path):
status_list.append("Installed")
else:
status_list.append("Not Installed")
# Check if the repository directory exists (Cloned).
if os.path.exists(repo_dir):
status_list.append("Cloned")
else:
status_list.append("Clonable")
# Mark ignored repositories.
if repo.get("ignore", False):
status_list.append("Ignored")
else:
status_list.append("Active")
# Define installable as cloned but not installed.
if os.path.exists(repo_dir) and not os.path.exists(executable_path):
status_list.append("Installable")
# Build a colored status string.
colored_statuses = [f"{status_colors.get(s, '')}{s}\033[0m" for s in status_list]
status_str = ", ".join(colored_statuses)
# If a status_filter is provided, only display repos whose status contains the filter.
if status_filter and status_filter not in status_str.lower():
continue
# Display repository details:
# Print the identifier in bold.
print(f"\033[1m{identifier}\033[0m")
# Print the description in italic if it exists.
description = repo.get("description")
if description:
print(f"\n\033[3m{description}\033[0m")
print("\nAttributes:")
# Loop through all attributes.
for key, value in repo.items():
formatted_value = str(value)
# Special formatting for "verified" attribute (yellow).
if key == "verified" and value:
formatted_value = f"\033[1;33m{value}\033[0m"
# Special formatting for "ignore" flag (red if True).
if key == "ignore" and value:
formatted_value = f"\033[1;31m{value}\033[0m"
if key == "description":
continue
# Highlight homepage in blue.
if key.lower() == "homepage" and value:
formatted_value = f"\033[1;34m{value}\033[0m"
print(f" {key}: {formatted_value}")
# Always display the computed status.
print(f" Status: {status_str}")
# If the repository is installed, display a hint for more info.
if os.path.exists(executable_path):
print(f"\nMore information and help: \033[1;4mpkgmgr {identifier} --help\033[0m\n")
print("-" * 40)
# Main program.
if __name__ == "__main__":
config_merged = load_config()
config_merged = load_config(USER_CONFIG_PATH)
repositories_base_dir = os.path.expanduser(config_merged["directories"]["repositories"])
all_repos_list = config_merged["repositories"]
description_text = """\
@@ -697,7 +147,7 @@ For detailed help on each command, use:
if args.command == "clone":
clone_repos(selected, repositories_base_dir, all_repos_list, args.preview)
else:
git_default_exec(selected, repositories_base_dir, all_repos_list, args.extra_args, args.command, preview=args.preview)
exec_git_command(selected, repositories_base_dir, all_repos_list, args.command, args.extra_args, preview)
elif args.command == "list":
list_repositories(all_repos_list, repositories_base_dir, BIN_DIR, search_filter=args.search, status_filter=args.status)
elif args.command == "deinstall":
@@ -764,15 +214,16 @@ For detailed help on each command, use:
elif args.command == "config":
if args.subcommand == "show":
if args.all or (not args.identifiers):
show_config([], full_config=True)
show_config([], USER_CONFIG_PATH, full_config=True)
else:
selected = resolve_repos(args.identifiers, all_repos_list)
if selected:
show_config(selected, full_config=False)
show_config(selected, USER_CONFIG_PATH, full_config=False)
elif args.subcommand == "add":
interactive_add(config_merged)
elif args.subcommand == "edit":
edit_config()
"""Open the user configuration file in nano."""
run_command(f"nano {USER_CONFIG_PATH}")
elif args.subcommand == "init":
if os.path.exists(USER_CONFIG_PATH):
with open(USER_CONFIG_PATH, 'r') as f:

0
pkgmgr/__init__.py Normal file
View File

40
pkgmgr/clone_repos.py Normal file
View File

@@ -0,0 +1,40 @@
import subprocess
import os
def clone_repos(selected_repos, repositories_base_dir: str, all_repos, preview=False):
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir, repo)
if os.path.exists(repo_dir):
print(f"[INFO] Repository '{repo_identifier}' already exists at '{repo_dir}'. Skipping clone.")
continue
parent_dir = os.path.dirname(repo_dir)
os.makedirs(parent_dir, exist_ok=True)
# Attempt SSH clone first.
target = repo.get("replacement") if repo.get("replacement") else f"{repo.get('provider')}:{repo.get('account')}/{repo.get('repository')}"
clone_url = f"git@{target}.git"
print(f"[INFO] Attempting to clone '{repo_identifier}' using SSH from {clone_url} into '{repo_dir}'.")
if preview:
print(f"[Preview] Would run: git clone {clone_url} {repo_dir} in {parent_dir}")
# Simulate a successful clone in preview mode.
result = subprocess.CompletedProcess(args=[], returncode=0)
else:
result = subprocess.run(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, shell=True)
# If SSH clone returns an error code, ask user whether to try HTTPS.
if result.returncode != 0:
print(f"[WARNING] SSH clone failed for '{repo_identifier}' with return code {result.returncode}.")
choice = input("Do you want to attempt HTTPS clone instead? (y/N): ").strip().lower()
if choice == 'y':
target = repo.get("replacement") if repo.get("replacement") else f"{repo.get('provider')}/{repo.get('account')}/{repo.get('repository')}"
clone_url = f"https://{target}.git"
print(f"[INFO] Attempting to clone '{repo_identifier}' using HTTPS from {clone_url} into '{repo_dir}'.")
if preview:
print(f"[Preview] Would run: git clone {clone_url} {repo_dir} in {parent_dir}")
else:
subprocess.run(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, shell=True)
else:
print(f"[INFO] HTTPS clone not attempted for '{repo_identifier}'.")

73
pkgmgr/config_init.py Normal file
View File

@@ -0,0 +1,73 @@
import subprocess
import os
def config_init(user_config, defaults_config, bin_dir):
"""
Scan the base directory (defaults_config["base"]) for repositories.
The folder structure is assumed to be:
{base}/{provider}/{account}/{repository}
For each repository found, automatically determine:
- provider, account, repository from folder names.
- verified: the latest commit (via 'git log -1 --format=%H').
- alias: generated from the repository name using generate_alias().
Repositories already defined in defaults_config["repositories"] or user_config["repositories"] are skipped.
"""
repositories_base_dir = os.path.expanduser(defaults_config["directories"]["repositories"])
if not os.path.isdir(repositories_base_dir):
print(f"Base directory '{repositories_base_dir}' does not exist.")
return
default_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in defaults_config.get("repositories", [])}
existing_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in user_config.get("repositories", [])}
existing_aliases = {entry.get("alias") for entry in user_config.get("repositories", []) if entry.get("alias")}
new_entries = []
for provider in os.listdir(repositories_base_dir):
provider_path = os.path.join(repositories_base_dir, provider)
if not os.path.isdir(provider_path):
continue
for account in os.listdir(provider_path):
account_path = os.path.join(provider_path, account)
if not os.path.isdir(account_path):
continue
for repo_name in os.listdir(account_path):
repo_path = os.path.join(account_path, repo_name)
if not os.path.isdir(repo_path):
continue
key = (provider, account, repo_name)
if key in default_keys or key in existing_keys:
continue
try:
result = subprocess.run(
["git", "log", "-1", "--format=%H"],
cwd=repo_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
)
verified = result.stdout.strip()
except Exception as e:
verified = ""
print(f"Could not determine latest commit for {repo_name} ({provider}/{account}): {e}")
entry = {
"provider": provider,
"account": account,
"repository": repo_name,
"verified": verified,
"ignore": True
}
alias = generate_alias({"repository": repo_name, "provider": provider, "account": account}, bin_dir, existing_aliases)
entry["alias"] = alias
existing_aliases.add(alias)
new_entries.append(entry)
print(f"Adding new repo entry: {entry}")
if new_entries:
user_config.setdefault("repositories", []).extend(new_entries)
save_user_config(user_config)
else:
print("No new repositories found.")

View File

@@ -0,0 +1,73 @@
import os
def create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=False, preview=False, no_verification=False):
"""
Create an executable bash wrapper for the repository.
If 'verified' is set, the wrapper will checkout that commit and warn (unless quiet is True).
If no verified commit is set, a warning is printed unless quiet is True.
If an 'alias' field is provided, a symlink is created in bin_dir with that alias.
"""
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir,repo)
command = repo.get("command")
if not command:
main_sh = os.path.join(repo_dir, "main.sh")
main_py = os.path.join(repo_dir, "main.py")
if os.path.exists(main_sh):
command = "bash main.sh"
elif os.path.exists(main_py):
command = "python3 main.py"
else:
if not quiet:
print(f"No command defined and no main.sh/main.py found in {repo_dir}. Skipping alias creation.")
return
ORANGE = r"\033[38;5;208m"
RESET = r"\033[0m"
if no_verification:
preamble = ""
else:
if verified := repo.get("verified"):
if not quiet:
preamble = f"""\
git checkout {verified} || echo -e "{ORANGE}Warning: Failed to checkout commit {verified}.{RESET}"
CURRENT=$(git rev-parse HEAD)
if [ "$CURRENT" != "{verified}" ]; then
echo -e "{ORANGE}Warning: Current commit ($CURRENT) does not match verified commit ({verified}).{RESET}"
fi
"""
else:
preamble = ""
else:
preamble = "" if quiet else f'echo -e "{ORANGE}Warning: No verified commit set for this repository.{RESET}"'
script_content = f"""#!/bin/bash
cd "{repo_dir}"
{preamble}
{command} "$@"
"""
alias_path = os.path.join(bin_dir, repo_identifier)
if preview:
print(f"[Preview] Would create executable '{alias_path}' with content:\n{script_content}")
else:
os.makedirs(bin_dir, exist_ok=True)
with open(alias_path, "w") as f:
f.write(script_content)
os.chmod(alias_path, 0o755)
if not quiet:
print(f"Installed executable for {repo_identifier} at {alias_path}")
alias_name = repo.get("alias")
if alias_name:
alias_link_path = os.path.join(bin_dir, alias_name)
try:
if os.path.exists(alias_link_path) or os.path.islink(alias_link_path):
os.remove(alias_link_path)
os.symlink(alias_path, alias_link_path)
if not quiet:
print(f"Created alias '{alias_name}' pointing to {repo_identifier}")
except Exception as e:
if not quiet:
print(f"Error creating alias '{alias_name}': {e}")

18
pkgmgr/deinstall_repos.py Normal file
View File

@@ -0,0 +1,18 @@
import os
def deinstall_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, preview=False):
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
alias_path = os.path.join(bin_dir, repo_identifier)
if os.path.exists(alias_path):
if preview:
print(f"[Preview] Would remove executable '{alias_path}'.")
else:
os.remove(alias_path)
print(f"Removed executable for {repo_identifier}.")
else:
print(f"No executable found for {repo_identifier} in {bin_dir}.")
teardown_cmd = repo.get("teardown")
repo_dir = get_repo_dir(repositories_base_dir,repo)
if teardown_cmd and os.path.exists(repo_dir):
run_command(teardown_cmd, cwd=repo_dir, preview=preview)

22
pkgmgr/delete_repos.py Normal file
View File

@@ -0,0 +1,22 @@
import shutil
import os
def delete_repos(selected_repos, repositories_base_dir, all_repos, preview=False):
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir, repo)
if os.path.exists(repo_dir):
confirm = input(f"Are you sure you want to delete directory '{repo_dir}' for {repo_identifier}? [y/N]: ").strip().lower()
if confirm == "y":
if preview:
print(f"[Preview] Would delete directory '{repo_dir}' for {repo_identifier}.")
else:
try:
shutil.rmtree(repo_dir)
print(f"Deleted repository directory '{repo_dir}' for {repo_identifier}.")
except Exception as e:
print(f"Error deleting '{repo_dir}' for {repo_identifier}: {e}")
else:
print(f"Skipped deletion of '{repo_dir}' for {repo_identifier}.")
else:
print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.")

View File

@@ -0,0 +1,12 @@
import os
def exec_git_command(selected_repos, repositories_base_dir, all_repos, git_cmd, extra_args, preview=False):
"""Execute a given git command with extra arguments for each repository."""
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir,repo)
if os.path.exists(repo_dir):
full_cmd = f"git {git_cmd} {' '.join(extra_args)}"
run_command(full_cmd, cwd=repo_dir, preview=preview)
else:
print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.")

3
pkgmgr/filter_ignored.py Normal file
View File

@@ -0,0 +1,3 @@
def filter_ignored(repos):
"""Filter out repositories that have 'ignore' set to True."""
return [r for r in repos if not r.get("ignore", False)]

42
pkgmgr/generate_alias.py Normal file
View File

@@ -0,0 +1,42 @@
import os
import hashlib
import re
def generate_alias(repo, bin_dir, existing_aliases):
"""
Generate an alias for a repository based on its repository name.
Steps:
1. Keep only consonants from the repository name (letters from BCDFGHJKLMNPQRSTVWXYZ).
2. Collapse consecutive identical consonants.
3. Truncate to at most 12 characters.
4. If that alias conflicts (already in existing_aliases or a file exists in bin_dir),
then prefix with the first letter of provider and account.
5. If still conflicting, append a three-character hash until the alias is unique.
"""
repo_name = repo.get("repository")
# Keep only consonants.
consonants = re.sub(r"[^bcdfghjklmnpqrstvwxyzBCDFGHJKLMNPQRSTVWXYZ]", "", repo_name)
# Collapse consecutive identical consonants.
collapsed = re.sub(r"(.)\1+", r"\1", consonants)
base_alias = collapsed[:12] if len(collapsed) > 12 else collapsed
candidate = base_alias.lower()
def conflict(alias):
alias_path = os.path.join(bin_dir, alias)
return alias in existing_aliases or os.path.exists(alias_path)
if not conflict(candidate):
return candidate
prefix = (repo.get("provider", "")[0] + repo.get("account", "")[0]).lower()
candidate2 = (prefix + candidate)[:12]
if not conflict(candidate2):
return candidate2
h = hashlib.md5(repo_name.encode("utf-8")).hexdigest()[:3]
candidate3 = (candidate2 + h)[:12]
while conflict(candidate3):
candidate3 += "x"
candidate3 = candidate3[:12]
return candidate3

15
pkgmgr/get_repo_dir.py Normal file
View File

@@ -0,0 +1,15 @@
import sys
import os
def get_repo_dir(repositories_base_dir:str,repo:{})->str:
try:
return os.path.join(repositories_base_dir, repo.get("provider"), repo.get("account"), repo.get("repository"))
except TypeError as e:
if repositories_base_dir:
print(f"Error: {e} \nThe repository {repo} seems not correct configured.\nPlease configure it correct.")
for key in ["provider","account","repository"]:
if not repo.get(key,False):
print(f"Key '{key}' is missing.")
else:
print(f"Error: {e} \nThe base {base} seems not correct configured.\nPlease configure it correct.")
sys.exit(1)

View File

@@ -0,0 +1,12 @@
def get_repo_identifier(repo, all_repos):
"""
Return a unique identifier for the repository.
If the repository name is unique among all_repos, return repository name;
otherwise, return 'provider/account/repository'.
"""
repo_name = repo.get("repository")
count = sum(1 for r in all_repos if r.get("repository") == repo_name)
if count == 1:
return repo_name
else:
return f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'

View File

@@ -0,0 +1,14 @@
import sys
from .resolve_repos import resolve_repos
from .filter_ignored import filter_ignored
def get_selected_repos(show_all:bool,all_repos_list,identifiers=None):
if show_all:
selected = all_repos_list
else:
selected = resolve_repos(identifiers, all_repos_list)
filtered = filter_ignored(selected)
if not selected:
print("Error: No repositories had been selected.")
sys.exit(1)
return filtered

14
pkgmgr/install_repos.py Normal file
View File

@@ -0,0 +1,14 @@
import os
def install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, preview=False, quiet=False):
"""Install repositories by creating executable wrappers and running setup."""
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir,repo)
if not os.path.exists(repo_dir):
print(f"Repository directory '{repo_dir}' does not exist. Clone it first.")
continue
create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=quiet, preview=preview, no_verification=no_verification)
setup_cmd = repo.get("setup")
if setup_cmd:
run_command(setup_cmd, cwd=repo_dir, preview=preview)

38
pkgmgr/interactive_add.py Normal file
View File

@@ -0,0 +1,38 @@
import yaml
import os
def interactive_add(config):
"""Interactively prompt the user to add a new repository entry to the user config."""
print("Adding a new repository configuration entry.")
new_entry = {}
new_entry["provider"] = input("Provider (e.g., github.com): ").strip()
new_entry["account"] = input("Account (e.g., yourusername): ").strip()
new_entry["repository"] = input("Repository name (e.g., mytool): ").strip()
new_entry["verified"] = input("Verified commit id: ").strip()
new_entry["command"] = input("Command (optional, leave blank to auto-detect): ").strip()
new_entry["description"] = input("Description (optional): ").strip()
new_entry["replacement"] = input("Replacement (optional): ").strip()
new_entry["setup"] = input("Setup command (optional): ").strip()
new_entry["teardown"] = input("Teardown command (optional): ").strip()
new_entry["alias"] = input("Alias (optional): ").strip()
# Allow the user to mark this entry as ignored.
ignore_val = input("Ignore this entry? (y/N): ").strip().lower()
if ignore_val == "y":
new_entry["ignore"] = True
print("\nNew entry:")
for key, value in new_entry.items():
if value:
print(f"{key}: {value}")
confirm = input("Add this entry to user config? (y/N): ").strip().lower()
if confirm == "y":
if os.path.exists(USER_CONFIG_PATH):
with open(USER_CONFIG_PATH, 'r') as f:
user_config = yaml.safe_load(f) or {}
else:
user_config = {"repositories": []}
user_config.setdefault("repositories", [])
user_config["repositories"].append(new_entry)
save_user_config(user_config)
else:
print("Entry not added.")

102
pkgmgr/list_repositories.py Normal file
View File

@@ -0,0 +1,102 @@
import os
def list_repositories(all_repos, repositories_base_dir, bin_dir, search_filter="", status_filter=""):
"""
List all repositories with their attributes and status information.
Parameters:
all_repos (list): List of repository configurations.
repositories_base_dir (str): The base directory where repositories are located.
bin_dir (str): The directory where executable wrappers are stored.
search_filter (str): Filter for repository attributes (case insensitive).
status_filter (str): Filter for computed status info (case insensitive).
For each repository, the identifier is printed in bold, the description (if available)
in italic, then all other attributes and computed status are printed.
If the repository is installed, a hint is displayed under the attributes.
Repositories are filtered out if either the search_filter is not found in any attribute or
if the status_filter is not found in the computed status string.
"""
search_filter = search_filter.lower() if search_filter else ""
status_filter = status_filter.lower() if status_filter else ""
# Define status colors using colors not used for other attributes:
# Avoid red (for ignore), blue (for homepage) and yellow (for verified).
status_colors = {
"Installed": "\033[1;32m", # Green
"Not Installed": "\033[1;35m", # Magenta
"Cloned": "\033[1;36m", # Cyan
"Clonable": "\033[1;37m", # White
"Ignored": "\033[38;5;208m", # Orange (extended)
"Active": "\033[38;5;129m", # Light Purple (extended)
"Installable": "\033[38;5;82m" # Light Green (extended)
}
for repo in all_repos:
# Combine all attribute values into one string for filtering.
repo_text = " ".join(str(v) for v in repo.values()).lower()
if search_filter and search_filter not in repo_text:
continue
# Compute status information for the repository.
identifier = get_repo_identifier(repo, all_repos)
executable_path = os.path.join(bin_dir, identifier)
repo_dir = get_repo_dir(repositories_base_dir, repo)
status_list = []
# Check if the executable exists (Installed).
if os.path.exists(executable_path):
status_list.append("Installed")
else:
status_list.append("Not Installed")
# Check if the repository directory exists (Cloned).
if os.path.exists(repo_dir):
status_list.append("Cloned")
else:
status_list.append("Clonable")
# Mark ignored repositories.
if repo.get("ignore", False):
status_list.append("Ignored")
else:
status_list.append("Active")
# Define installable as cloned but not installed.
if os.path.exists(repo_dir) and not os.path.exists(executable_path):
status_list.append("Installable")
# Build a colored status string.
colored_statuses = [f"{status_colors.get(s, '')}{s}\033[0m" for s in status_list]
status_str = ", ".join(colored_statuses)
# If a status_filter is provided, only display repos whose status contains the filter.
if status_filter and status_filter not in status_str.lower():
continue
# Display repository details:
# Print the identifier in bold.
print(f"\033[1m{identifier}\033[0m")
# Print the description in italic if it exists.
description = repo.get("description")
if description:
print(f"\n\033[3m{description}\033[0m")
print("\nAttributes:")
# Loop through all attributes.
for key, value in repo.items():
formatted_value = str(value)
# Special formatting for "verified" attribute (yellow).
if key == "verified" and value:
formatted_value = f"\033[1;33m{value}\033[0m"
# Special formatting for "ignore" flag (red if True).
if key == "ignore" and value:
formatted_value = f"\033[1;31m{value}\033[0m"
if key == "description":
continue
# Highlight homepage in blue.
if key.lower() == "homepage" and value:
formatted_value = f"\033[1;34m{value}\033[0m"
print(f" {key}: {formatted_value}")
# Always display the computed status.
print(f" Status: {status_str}")
# If the repository is installed, display a hint for more info.
if os.path.exists(executable_path):
print(f"\nMore information and help: \033[1;4mpkgmgr {identifier} --help\033[0m\n")
print("-" * 40)

25
pkgmgr/load_config.py Normal file
View File

@@ -0,0 +1,25 @@
import sys
import yaml
import os
DEFAULT_CONFIG_PATH = os.path.join("config", "defaults.yaml")
def load_config(user_config_path):
"""Load configuration from defaults and merge in user config if present."""
if not os.path.exists(DEFAULT_CONFIG_PATH):
print(f"Default configuration file '{DEFAULT_CONFIG_PATH}' not found.")
sys.exit(1)
with open(DEFAULT_CONFIG_PATH, 'r') as f:
config = yaml.safe_load(f)
if "directories" not in config or "repositories" not in config:
print("Default config file must contain 'directories' and 'repositories' keys.")
sys.exit(1)
if os.path.exists(user_config_path):
with open(user_config_path, 'r') as f:
user_config = yaml.safe_load(f)
if user_config:
if "directories" in user_config:
config["directories"] = user_config["directories"]
if "repositories" in user_config:
config["repositories"].extend(user_config["repositories"])
return config

28
pkgmgr/resolve_repos.py Normal file
View File

@@ -0,0 +1,28 @@
import os
def resolve_repos(identifiers, all_repos):
"""
Given a list of identifier strings, return a list of repository configs.
The identifier can be:
- the full identifier "provider/account/repository"
- the repository name (if unique among all_repos)
- the alias (if defined)
"""
selected = []
for ident in identifiers:
matches = []
for repo in all_repos:
full_id = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
if ident == full_id:
matches.append(repo)
elif ident == repo.get("alias"):
matches.append(repo)
elif ident == repo.get("repository"):
# Only match if repository name is unique among all_repos.
if sum(1 for r in all_repos if r.get("repository") == ident) == 1:
matches.append(repo)
if not matches:
print(f"Identifier '{ident}' did not match any repository in config.")
else:
selected.extend(matches)
return selected

18
pkgmgr/run_command.py Normal file
View File

@@ -0,0 +1,18 @@
import sys
import subprocess
import os
def run_command(command, cwd=None, preview=False):
"""Run a shell command in a given directory, or print it in preview mode.
If the command fails, exit the program with the command's exit code.
"""
current_dir = cwd or os.getcwd()
if preview:
print(f"[Preview] In '{current_dir}': {command}")
else:
print(f"Running in '{current_dir}': {command}")
result = subprocess.run(command, cwd=cwd, shell=True, check=False)
if result.returncode != 0:
print(f"Command failed with exit code {result.returncode}. Exiting.")
sys.exit(result.returncode)

View File

@@ -0,0 +1,9 @@
import yaml
import os
def save_user_config(user_config):
"""Save the user configuration to USER_CONFIG_PATH."""
os.makedirs(os.path.dirname(USER_CONFIG_PATH), exist_ok=True)
with open(USER_CONFIG_PATH, 'w') as f:
yaml.dump(user_config, f)
print(f"User configuration updated in {USER_CONFIG_PATH}.")

14
pkgmgr/show_config.py Normal file
View File

@@ -0,0 +1,14 @@
import yaml
def show_config(selected_repos, user_config_path, full_config=False):
"""Display configuration for one or more repositories, or the entire merged config."""
if full_config:
merged = load_config(user_config_path)
print(yaml.dump(merged, default_flow_style=False))
else:
for repo in selected_repos:
identifier = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
print(f"Repository: {identifier}")
for key, value in repo.items():
print(f" {key}: {value}")
print("-" * 40)

11
pkgmgr/status_repos.py Normal file
View File

@@ -0,0 +1,11 @@
import sys
def status_repos(selected_repos, repositories_base_dir, all_repos, extra_args, list_only=False, system_status=False, preview=False):
if system_status:
print("System status:")
run_command("yay -Qu", preview=preview)
if list_only:
for repo in selected_repos:
print(get_repo_identifier(repo, all_repos))
else:
exec_git_command(selected_repos, repositories_base_dir, all_repos, "status", extra_args, preview)

8
pkgmgr/update_repos.py Normal file
View File

@@ -0,0 +1,8 @@
import sys
def update_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, system_update=False, preview=False, quiet=False):
git_default_exec(selected_repos, repositories_base_dir, all_repos, extra_args=[],command="pull", preview=preview)
install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, no_verification, preview=preview, quiet=quiet)
if system_update:
run_command("yay -Syu", preview=preview)
run_command("sudo pacman -Syyu", preview=preview)