770 lines
37 KiB
Python
770 lines
37 KiB
Python
import re
|
||
import hashlib
|
||
import os
|
||
import subprocess
|
||
import shutil
|
||
import sys
|
||
import yaml
|
||
import argparse
|
||
import json
|
||
|
||
# Define configuration file paths.
|
||
DEFAULT_CONFIG_PATH = os.path.join("config", "defaults.yaml")
|
||
USER_CONFIG_PATH = os.path.join("config", "config.yaml")
|
||
BIN_DIR = os.path.expanduser("~/.local/bin")
|
||
|
||
# Commands proxied by package-manager
|
||
GIT_DEFAULT_COMMANDS = [
|
||
"pull",
|
||
"push",
|
||
"diff",
|
||
"add",
|
||
"show",
|
||
"checkout",
|
||
"clone",
|
||
"reset",
|
||
"revert"
|
||
]
|
||
|
||
def load_config():
|
||
"""Load configuration from defaults and merge in user config if present."""
|
||
if not os.path.exists(DEFAULT_CONFIG_PATH):
|
||
print(f"Default configuration file '{DEFAULT_CONFIG_PATH}' not found.")
|
||
sys.exit(1)
|
||
with open(DEFAULT_CONFIG_PATH, 'r') as f:
|
||
config = yaml.safe_load(f)
|
||
if "directories" not in config or "repositories" not in config:
|
||
print("Default config file must contain 'directories' and 'repositories' keys.")
|
||
sys.exit(1)
|
||
if os.path.exists(USER_CONFIG_PATH):
|
||
with open(USER_CONFIG_PATH, 'r') as f:
|
||
user_config = yaml.safe_load(f)
|
||
if user_config:
|
||
if "directories" in user_config:
|
||
config["directories"] = user_config["directories"]
|
||
if "repositories" in user_config:
|
||
config["repositories"].extend(user_config["repositories"])
|
||
return config
|
||
|
||
def save_user_config(user_config):
|
||
"""Save the user configuration to USER_CONFIG_PATH."""
|
||
os.makedirs(os.path.dirname(USER_CONFIG_PATH), exist_ok=True)
|
||
with open(USER_CONFIG_PATH, 'w') as f:
|
||
yaml.dump(user_config, f)
|
||
print(f"User configuration updated in {USER_CONFIG_PATH}.")
|
||
|
||
def run_command(command, cwd=None, preview=False):
|
||
"""Run a shell command in a given directory, or print it in preview mode."""
|
||
if preview:
|
||
print(f"[Preview] In '{cwd or os.getcwd()}': {command}")
|
||
else:
|
||
print(f"Running in '{cwd or os.getcwd()}': {command}")
|
||
subprocess.run(command, cwd=cwd, shell=True, check=False)
|
||
|
||
def get_repo_identifier(repo, all_repos):
|
||
"""
|
||
Return a unique identifier for the repository.
|
||
If the repository name is unique among all_repos, return repository name;
|
||
otherwise, return 'provider/account/repository'.
|
||
"""
|
||
repo_name = repo.get("repository")
|
||
count = sum(1 for r in all_repos if r.get("repository") == repo_name)
|
||
if count == 1:
|
||
return repo_name
|
||
else:
|
||
return f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
|
||
|
||
def resolve_repos(identifiers, all_repos):
|
||
"""
|
||
Given a list of identifier strings, return a list of repository configs.
|
||
The identifier can be:
|
||
- the full identifier "provider/account/repository"
|
||
- the repository name (if unique among all_repos)
|
||
- the alias (if defined)
|
||
"""
|
||
selected = []
|
||
for ident in identifiers:
|
||
matches = []
|
||
for repo in all_repos:
|
||
full_id = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
|
||
if ident == full_id:
|
||
matches.append(repo)
|
||
elif ident == repo.get("alias"):
|
||
matches.append(repo)
|
||
elif ident == repo.get("repository"):
|
||
# Only match if repository name is unique among all_repos.
|
||
if sum(1 for r in all_repos if r.get("repository") == ident) == 1:
|
||
matches.append(repo)
|
||
if not matches:
|
||
print(f"Identifier '{ident}' did not match any repository in config.")
|
||
else:
|
||
selected.extend(matches)
|
||
return selected
|
||
|
||
def filter_ignored(repos):
|
||
"""Filter out repositories that have 'ignore' set to True."""
|
||
return [r for r in repos if not r.get("ignore", False)]
|
||
|
||
def generate_alias(repo, bin_dir, existing_aliases):
|
||
"""
|
||
Generate an alias for a repository based on its repository name.
|
||
|
||
Steps:
|
||
1. Keep only consonants from the repository name (letters from BCDFGHJKLMNPQRSTVWXYZ).
|
||
2. Collapse consecutive identical consonants.
|
||
3. Truncate to at most 12 characters.
|
||
4. If that alias conflicts (already in existing_aliases or a file exists in bin_dir),
|
||
then prefix with the first letter of provider and account.
|
||
5. If still conflicting, append a three-character hash until the alias is unique.
|
||
"""
|
||
repo_name = repo.get("repository")
|
||
# Keep only consonants.
|
||
consonants = re.sub(r"[^bcdfghjklmnpqrstvwxyzBCDFGHJKLMNPQRSTVWXYZ]", "", repo_name)
|
||
# Collapse consecutive identical consonants.
|
||
collapsed = re.sub(r"(.)\1+", r"\1", consonants)
|
||
base_alias = collapsed[:12] if len(collapsed) > 12 else collapsed
|
||
candidate = base_alias.lower()
|
||
|
||
def conflict(alias):
|
||
alias_path = os.path.join(bin_dir, alias)
|
||
return alias in existing_aliases or os.path.exists(alias_path)
|
||
|
||
if not conflict(candidate):
|
||
return candidate
|
||
|
||
prefix = (repo.get("provider", "")[0] + repo.get("account", "")[0]).lower()
|
||
candidate2 = (prefix + candidate)[:12]
|
||
if not conflict(candidate2):
|
||
return candidate2
|
||
|
||
h = hashlib.md5(repo_name.encode("utf-8")).hexdigest()[:3]
|
||
candidate3 = (candidate2 + h)[:12]
|
||
while conflict(candidate3):
|
||
candidate3 += "x"
|
||
candidate3 = candidate3[:12]
|
||
return candidate3
|
||
|
||
def create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=False, preview=False, no_verification=False):
|
||
"""
|
||
Create an executable bash wrapper for the repository.
|
||
|
||
If 'verified' is set, the wrapper will checkout that commit and warn (unless quiet is True).
|
||
If no verified commit is set, a warning is printed unless quiet is True.
|
||
If an 'alias' field is provided, a symlink is created in bin_dir with that alias.
|
||
"""
|
||
repo_identifier = get_repo_identifier(repo, all_repos)
|
||
repo_dir = get_repo_dir(repositories_base_dir,repo)
|
||
command = repo.get("command")
|
||
if not command:
|
||
main_sh = os.path.join(repo_dir, "main.sh")
|
||
main_py = os.path.join(repo_dir, "main.py")
|
||
if os.path.exists(main_sh):
|
||
command = "bash main.sh"
|
||
elif os.path.exists(main_py):
|
||
command = "python3 main.py"
|
||
else:
|
||
if not quiet:
|
||
print(f"No command defined and no main.sh/main.py found in {repo_dir}. Skipping alias creation.")
|
||
return
|
||
|
||
ORANGE = r"\033[38;5;208m"
|
||
RESET = r"\033[0m"
|
||
|
||
if no_verification:
|
||
preamble = ""
|
||
else:
|
||
if verified := repo.get("verified"):
|
||
if not quiet:
|
||
preamble = f"""\
|
||
git checkout {verified} || echo -e "{ORANGE}Warning: Failed to checkout commit {verified}.{RESET}"
|
||
CURRENT=$(git rev-parse HEAD)
|
||
if [ "$CURRENT" != "{verified}" ]; then
|
||
echo -e "{ORANGE}Warning: Current commit ($CURRENT) does not match verified commit ({verified}).{RESET}"
|
||
fi
|
||
"""
|
||
else:
|
||
preamble = ""
|
||
else:
|
||
preamble = "" if quiet else f'echo -e "{ORANGE}Warning: No verified commit set for this repository.{RESET}"'
|
||
|
||
script_content = f"""#!/bin/bash
|
||
cd "{repo_dir}"
|
||
{preamble}
|
||
{command} "$@"
|
||
"""
|
||
alias_path = os.path.join(bin_dir, repo_identifier)
|
||
if preview:
|
||
print(f"[Preview] Would create executable '{alias_path}' with content:\n{script_content}")
|
||
else:
|
||
os.makedirs(bin_dir, exist_ok=True)
|
||
with open(alias_path, "w") as f:
|
||
f.write(script_content)
|
||
os.chmod(alias_path, 0o755)
|
||
if not quiet:
|
||
print(f"Installed executable for {repo_identifier} at {alias_path}")
|
||
|
||
alias_name = repo.get("alias")
|
||
if alias_name:
|
||
alias_link_path = os.path.join(bin_dir, alias_name)
|
||
try:
|
||
if os.path.exists(alias_link_path) or os.path.islink(alias_link_path):
|
||
os.remove(alias_link_path)
|
||
os.symlink(alias_path, alias_link_path)
|
||
if not quiet:
|
||
print(f"Created alias '{alias_name}' pointing to {repo_identifier}")
|
||
except Exception as e:
|
||
if not quiet:
|
||
print(f"Error creating alias '{alias_name}': {e}")
|
||
|
||
def install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, preview=False, quiet=False):
|
||
"""Install repositories by creating executable wrappers and running setup."""
|
||
for repo in selected_repos:
|
||
repo_identifier = get_repo_identifier(repo, all_repos)
|
||
repo_dir = get_repo_dir(repositories_base_dir,repo)
|
||
if not os.path.exists(repo_dir):
|
||
print(f"Repository directory '{repo_dir}' does not exist. Clone it first.")
|
||
continue
|
||
create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=quiet, preview=preview, no_verification=no_verification)
|
||
setup_cmd = repo.get("setup")
|
||
if setup_cmd:
|
||
run_command(setup_cmd, cwd=repo_dir, preview=preview)
|
||
|
||
def exec_git_command(selected_repos, repositories_base_dir, all_repos, git_cmd, extra_args, preview=False):
|
||
"""Execute a given git command with extra arguments for each repository."""
|
||
for repo in selected_repos:
|
||
repo_identifier = get_repo_identifier(repo, all_repos)
|
||
repo_dir = get_repo_dir(repositories_base_dir,repo)
|
||
if os.path.exists(repo_dir):
|
||
full_cmd = f"git {git_cmd} {' '.join(extra_args)}"
|
||
run_command(full_cmd, cwd=repo_dir, preview=preview)
|
||
else:
|
||
print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.")
|
||
|
||
|
||
def status_repos(selected_repos, repositories_base_dir, all_repos, extra_args, list_only=False, system_status=False, preview=False):
|
||
if system_status:
|
||
print("System status:")
|
||
run_command("yay -Qu", preview=preview)
|
||
if list_only:
|
||
for repo in selected_repos:
|
||
print(get_repo_identifier(repo, all_repos))
|
||
else:
|
||
exec_git_command(selected_repos, repositories_base_dir, all_repos, "status", extra_args, preview)
|
||
|
||
def get_repo_dir(repositories_base_dir:str,repo:{})->str:
|
||
try:
|
||
return os.path.join(repositories_base_dir, repo.get("provider"), repo.get("account"), repo.get("repository"))
|
||
except TypeError as e:
|
||
if repositories_base_dir:
|
||
print(f"Error: {e} \nThe repository {repo} seems not correct configured.\nPlease configure it correct.")
|
||
for key in ["provider","account","repository"]:
|
||
if not repo.get(key,False):
|
||
print(f"Key '{key}' is missing.")
|
||
else:
|
||
print(f"Error: {e} \nThe base {base} seems not correct configured.\nPlease configure it correct.")
|
||
sys.exit(1)
|
||
|
||
def clone_repos(selected_repos, repositories_base_dir:str, all_repos, preview=False):
|
||
for repo in selected_repos:
|
||
repo_identifier = get_repo_identifier(repo, all_repos)
|
||
repo_dir = get_repo_dir(repositories_base_dir,repo)
|
||
if os.path.exists(repo_dir):
|
||
print(f"Repository '{repo_identifier}' already exists at '{repo_dir}'.")
|
||
continue
|
||
target = repo.get("replacement") if repo.get("replacement") else f"{repo.get('provider')}/{repo.get('account')}/{repo.get('repository')}"
|
||
clone_url = f"https://{target}.git"
|
||
parent_dir = os.path.dirname(repo_dir)
|
||
os.makedirs(parent_dir, exist_ok=True)
|
||
run_command(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, preview=preview)
|
||
|
||
def deinstall_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, preview=False):
|
||
for repo in selected_repos:
|
||
repo_identifier = get_repo_identifier(repo, all_repos)
|
||
alias_path = os.path.join(bin_dir, repo_identifier)
|
||
if os.path.exists(alias_path):
|
||
if preview:
|
||
print(f"[Preview] Would remove executable '{alias_path}'.")
|
||
else:
|
||
os.remove(alias_path)
|
||
print(f"Removed executable for {repo_identifier}.")
|
||
else:
|
||
print(f"No executable found for {repo_identifier} in {bin_dir}.")
|
||
teardown_cmd = repo.get("teardown")
|
||
repo_dir = get_repo_dir(repositories_base_dir,repo)
|
||
if teardown_cmd and os.path.exists(repo_dir):
|
||
run_command(teardown_cmd, cwd=repo_dir, preview=preview)
|
||
|
||
def delete_repos(selected_repos, repositories_base_dir, all_repos, preview=False):
|
||
for repo in selected_repos:
|
||
repo_identifier = get_repo_identifier(repo, all_repos)
|
||
repo_dir = get_repo_dir(repositories_base_dir,repo)
|
||
if os.path.exists(repo_dir):
|
||
if preview:
|
||
print(f"[Preview] Would delete directory '{repo_dir}' for {repo_identifier}.")
|
||
else:
|
||
shutil.rmtree(repo_dir)
|
||
print(f"Deleted repository directory '{repo_dir}' for {repo_identifier}.")
|
||
else:
|
||
print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.")
|
||
|
||
def update_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, system_update=False, preview=False, quiet=False):
|
||
git_default_exec(selected_repos, repositories_base_dir, all_repos, extra_args=[],command="pull", preview=preview)
|
||
install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, no_verification, preview=preview, quiet=quiet)
|
||
if system_update:
|
||
run_command("yay -Syu", preview=preview)
|
||
run_command("sudo pacman -Syyu", preview=preview)
|
||
|
||
def git_default_exec(selected_repos, repositories_base_dir, all_repos, extra_args, command:str, preview=False):
|
||
exec_git_command(selected_repos, repositories_base_dir, all_repos, command, extra_args, preview)
|
||
|
||
def show_config(selected_repos, full_config=False):
|
||
"""Display configuration for one or more repositories, or the entire merged config."""
|
||
if full_config:
|
||
merged = load_config()
|
||
print(yaml.dump(merged, default_flow_style=False))
|
||
else:
|
||
for repo in selected_repos:
|
||
identifier = f'{repo.get("provider")}/{repo.get("account")}/{repo.get("repository")}'
|
||
print(f"Repository: {identifier}")
|
||
for key, value in repo.items():
|
||
print(f" {key}: {value}")
|
||
print("-" * 40)
|
||
|
||
def interactive_add(config):
|
||
"""Interactively prompt the user to add a new repository entry to the user config."""
|
||
print("Adding a new repository configuration entry.")
|
||
new_entry = {}
|
||
new_entry["provider"] = input("Provider (e.g., github.com): ").strip()
|
||
new_entry["account"] = input("Account (e.g., yourusername): ").strip()
|
||
new_entry["repository"] = input("Repository name (e.g., mytool): ").strip()
|
||
new_entry["verified"] = input("Verified commit id: ").strip()
|
||
new_entry["command"] = input("Command (optional, leave blank to auto-detect): ").strip()
|
||
new_entry["description"] = input("Description (optional): ").strip()
|
||
new_entry["replacement"] = input("Replacement (optional): ").strip()
|
||
new_entry["setup"] = input("Setup command (optional): ").strip()
|
||
new_entry["teardown"] = input("Teardown command (optional): ").strip()
|
||
new_entry["alias"] = input("Alias (optional): ").strip()
|
||
# Allow the user to mark this entry as ignored.
|
||
ignore_val = input("Ignore this entry? (y/N): ").strip().lower()
|
||
if ignore_val == "y":
|
||
new_entry["ignore"] = True
|
||
|
||
print("\nNew entry:")
|
||
for key, value in new_entry.items():
|
||
if value:
|
||
print(f"{key}: {value}")
|
||
confirm = input("Add this entry to user config? (y/N): ").strip().lower()
|
||
if confirm == "y":
|
||
if os.path.exists(USER_CONFIG_PATH):
|
||
with open(USER_CONFIG_PATH, 'r') as f:
|
||
user_config = yaml.safe_load(f) or {}
|
||
else:
|
||
user_config = {"repositories": []}
|
||
user_config.setdefault("repositories", [])
|
||
user_config["repositories"].append(new_entry)
|
||
save_user_config(user_config)
|
||
else:
|
||
print("Entry not added.")
|
||
|
||
def edit_config():
|
||
"""Open the user configuration file in nano."""
|
||
run_command(f"nano {USER_CONFIG_PATH}")
|
||
|
||
def config_init(user_config, defaults_config, bin_dir):
|
||
"""
|
||
Scan the base directory (defaults_config["base"]) for repositories.
|
||
The folder structure is assumed to be:
|
||
{base}/{provider}/{account}/{repository}
|
||
For each repository found, automatically determine:
|
||
- provider, account, repository from folder names.
|
||
- verified: the latest commit (via 'git log -1 --format=%H').
|
||
- alias: generated from the repository name using generate_alias().
|
||
Repositories already defined in defaults_config["repositories"] or user_config["repositories"] are skipped.
|
||
"""
|
||
repositories_base_dir = os.path.expanduser(defaults_config["directories"]["repositories"])
|
||
if not os.path.isdir(repositories_base_dir):
|
||
print(f"Base directory '{repositories_base_dir}' does not exist.")
|
||
return
|
||
|
||
default_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
|
||
for entry in defaults_config.get("repositories", [])}
|
||
existing_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
|
||
for entry in user_config.get("repositories", [])}
|
||
existing_aliases = {entry.get("alias") for entry in user_config.get("repositories", []) if entry.get("alias")}
|
||
|
||
new_entries = []
|
||
for provider in os.listdir(repositories_base_dir):
|
||
provider_path = os.path.join(repositories_base_dir, provider)
|
||
if not os.path.isdir(provider_path):
|
||
continue
|
||
for account in os.listdir(provider_path):
|
||
account_path = os.path.join(provider_path, account)
|
||
if not os.path.isdir(account_path):
|
||
continue
|
||
for repo_name in os.listdir(account_path):
|
||
repo_path = os.path.join(account_path, repo_name)
|
||
if not os.path.isdir(repo_path):
|
||
continue
|
||
key = (provider, account, repo_name)
|
||
if key in default_keys or key in existing_keys:
|
||
continue
|
||
try:
|
||
result = subprocess.run(
|
||
["git", "log", "-1", "--format=%H"],
|
||
cwd=repo_path,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
verified = result.stdout.strip()
|
||
except Exception as e:
|
||
verified = ""
|
||
print(f"Could not determine latest commit for {repo_name} ({provider}/{account}): {e}")
|
||
|
||
entry = {
|
||
"provider": provider,
|
||
"account": account,
|
||
"repository": repo_name,
|
||
"verified": verified,
|
||
"ignore": True
|
||
}
|
||
alias = generate_alias({"repository": repo_name, "provider": provider, "account": account}, bin_dir, existing_aliases)
|
||
entry["alias"] = alias
|
||
existing_aliases.add(alias)
|
||
new_entries.append(entry)
|
||
print(f"Adding new repo entry: {entry}")
|
||
|
||
if new_entries:
|
||
user_config.setdefault("repositories", []).extend(new_entries)
|
||
save_user_config(user_config)
|
||
else:
|
||
print("No new repositories found.")
|
||
|
||
def get_selected_repos(show_all:bool,all_repos_list,identifiers=None):
|
||
selected = all_repos_list if show_all or (not identifiers) else resolve_repos(identifiers, all_repos_list)
|
||
return filter_ignored(selected)
|
||
|
||
def list_repositories(all_repos, repositories_base_dir, bin_dir, search_filter="", status_filter=""):
|
||
"""
|
||
List all repositories with their attributes and status information.
|
||
|
||
Parameters:
|
||
all_repos (list): List of repository configurations.
|
||
repositories_base_dir (str): The base directory where repositories are located.
|
||
bin_dir (str): The directory where executable wrappers are stored.
|
||
search_filter (str): Filter for repository attributes (case insensitive).
|
||
status_filter (str): Filter for computed status info (case insensitive).
|
||
|
||
For each repository, the identifier is printed in bold, the description (if available)
|
||
in italic, then all other attributes and computed status are printed.
|
||
If the repository is installed, a hint is displayed under the attributes.
|
||
Repositories are filtered out if either the search_filter is not found in any attribute or
|
||
if the status_filter is not found in the computed status string.
|
||
"""
|
||
search_filter = search_filter.lower() if search_filter else ""
|
||
status_filter = status_filter.lower() if status_filter else ""
|
||
|
||
# Define status colors using colors not used for other attributes:
|
||
# Avoid red (for ignore), blue (for homepage) and yellow (for verified).
|
||
status_colors = {
|
||
"Installed": "\033[1;32m", # Green
|
||
"Not Installed": "\033[1;35m", # Magenta
|
||
"Cloned": "\033[1;36m", # Cyan
|
||
"Clonable": "\033[1;37m", # White
|
||
"Ignored": "\033[38;5;208m", # Orange (extended)
|
||
"Active": "\033[38;5;129m", # Light Purple (extended)
|
||
"Installable": "\033[38;5;82m" # Light Green (extended)
|
||
}
|
||
|
||
for repo in all_repos:
|
||
# Combine all attribute values into one string for filtering.
|
||
repo_text = " ".join(str(v) for v in repo.values()).lower()
|
||
if search_filter and search_filter not in repo_text:
|
||
continue
|
||
|
||
# Compute status information for the repository.
|
||
identifier = get_repo_identifier(repo, all_repos)
|
||
executable_path = os.path.join(bin_dir, identifier)
|
||
repo_dir = get_repo_dir(repositories_base_dir, repo)
|
||
status_list = []
|
||
|
||
# Check if the executable exists (Installed).
|
||
if os.path.exists(executable_path):
|
||
status_list.append("Installed")
|
||
else:
|
||
status_list.append("Not Installed")
|
||
# Check if the repository directory exists (Cloned).
|
||
if os.path.exists(repo_dir):
|
||
status_list.append("Cloned")
|
||
else:
|
||
status_list.append("Clonable")
|
||
# Mark ignored repositories.
|
||
if repo.get("ignore", False):
|
||
status_list.append("Ignored")
|
||
else:
|
||
status_list.append("Active")
|
||
# Define installable as cloned but not installed.
|
||
if os.path.exists(repo_dir) and not os.path.exists(executable_path):
|
||
status_list.append("Installable")
|
||
|
||
# Build a colored status string.
|
||
colored_statuses = [f"{status_colors.get(s, '')}{s}\033[0m" for s in status_list]
|
||
status_str = ", ".join(colored_statuses)
|
||
|
||
# If a status_filter is provided, only display repos whose status contains the filter.
|
||
if status_filter and status_filter not in status_str.lower():
|
||
continue
|
||
|
||
# Display repository details:
|
||
# Print the identifier in bold.
|
||
print(f"\033[1m{identifier}\033[0m")
|
||
# Print the description in italic if it exists.
|
||
description = repo.get("description")
|
||
if description:
|
||
print(f"\n\033[3m{description}\033[0m")
|
||
print("\nAttributes:")
|
||
# Loop through all attributes.
|
||
for key, value in repo.items():
|
||
formatted_value = str(value)
|
||
# Special formatting for "verified" attribute (yellow).
|
||
if key == "verified" and value:
|
||
formatted_value = f"\033[1;33m{value}\033[0m"
|
||
# Special formatting for "ignore" flag (red if True).
|
||
if key == "ignore" and value:
|
||
formatted_value = f"\033[1;31m{value}\033[0m"
|
||
if key == "description":
|
||
continue
|
||
# Highlight homepage in blue.
|
||
if key.lower() == "homepage" and value:
|
||
formatted_value = f"\033[1;34m{value}\033[0m"
|
||
print(f" {key}: {formatted_value}")
|
||
# Always display the computed status.
|
||
print(f" Status: {status_str}")
|
||
# If the repository is installed, display a hint for more info.
|
||
if os.path.exists(executable_path):
|
||
print(f"\nMore information and help: \033[1;4m{identifier} --help\033[0m\n")
|
||
print("-" * 40)
|
||
# Main program.
|
||
if __name__ == "__main__":
|
||
config_merged = load_config()
|
||
repositories_base_dir = os.path.expanduser(config_merged["directories"]["repositories"])
|
||
all_repos_list = config_merged["repositories"]
|
||
description_text = """\
|
||
\033[1;32mPackage Manager 🤖📦\033[0m
|
||
|
||
\033[1mOverview:\033[0m
|
||
A configurable Python tool to manage multiple repositories via a unified command-line interface.
|
||
Drafted by \033[1;34mKevin Veen-Birkenbach\033[0m (\033[4mhttps://www.veen.world/\033[0m),
|
||
this tool automates common Git operations (clone, pull, push, status, etc.) and creates executable wrappers
|
||
and custom aliases to simplify your workflow.
|
||
|
||
\033[1mFeatures:\033[0m
|
||
• \033[1;33mAuto-install & Setup:\033[0m Automatically detect and set up repositories.
|
||
• \033[1;33mGit Command Integration:\033[0m Execute Git commands with extra parameters.
|
||
• \033[1;33mExplorer & Terminal Support:\033[0m Open repositories in your file manager or a new terminal tab.
|
||
• \033[1;33mComprehensive Configuration:\033[0m Manage settings via YAML files (default & user-specific).
|
||
|
||
For detailed help on each command, use:
|
||
\033[1m pkgmgr <command> --help\033[0m
|
||
"""
|
||
|
||
parser = argparse.ArgumentParser(description=description_text,formatter_class=argparse.RawTextHelpFormatter)
|
||
subparsers = parser.add_subparsers(dest="command", help="Subcommands")
|
||
def add_identifier_arguments(subparser):
|
||
subparser.add_argument("identifiers", nargs="*", help="Identifier(s) for repositories")
|
||
subparser.add_argument("--all", action="store_true", default=False, help="Apply to all repositories in the config")
|
||
subparser.add_argument("--preview", action="store_true", help="Preview changes without executing commands")
|
||
subparser.add_argument("--list", action="store_true", help="List affected repositories (with preview or status)")
|
||
subparser.add_argument("extra_args", nargs=argparse.REMAINDER, help="Extra arguments for the git command")
|
||
|
||
install_parser = subparsers.add_parser("install", help="Install repository/repositories")
|
||
add_identifier_arguments(install_parser)
|
||
install_parser.add_argument("-q", "--quiet", action="store_true", help="Suppress warnings and info messages")
|
||
install_parser.add_argument("--no-verification", default=False, action="store_true", help="Disable verification of repository commit")
|
||
|
||
deinstall_parser = subparsers.add_parser("deinstall", help="Deinstall repository/repositories")
|
||
add_identifier_arguments(deinstall_parser)
|
||
|
||
delete_parser = subparsers.add_parser("delete", help="Delete repository directory for repository/repositories")
|
||
add_identifier_arguments(delete_parser)
|
||
|
||
update_parser = subparsers.add_parser("update", help="Update (pull + install) repository/repositories")
|
||
add_identifier_arguments(update_parser)
|
||
update_parser.add_argument("--system", action="store_true", help="Include system update commands")
|
||
update_parser.add_argument("-q", "--quiet", action="store_true", help="Suppress warnings and info messages")
|
||
update_parser.add_argument("--no-verification", action="store_true", default=False, help="Disable verification of repository commit")
|
||
|
||
status_parser = subparsers.add_parser("status", help="Show status for repository/repositories or system")
|
||
add_identifier_arguments(status_parser)
|
||
status_parser.add_argument("--system", action="store_true", help="Show system status")
|
||
|
||
config_parser = subparsers.add_parser("config", help="Manage configuration")
|
||
config_subparsers = config_parser.add_subparsers(dest="subcommand", help="Config subcommands", required=True)
|
||
config_show = config_subparsers.add_parser("show", help="Show configuration")
|
||
add_identifier_arguments(config_show)
|
||
config_add = config_subparsers.add_parser("add", help="Interactively add a new repository entry")
|
||
config_edit = config_subparsers.add_parser("edit", help="Edit configuration file with nano")
|
||
config_init_parser = config_subparsers.add_parser("init", help="Initialize user configuration by scanning the base directory")
|
||
config_delete = config_subparsers.add_parser("delete", help="Delete repository entry from user config")
|
||
add_identifier_arguments(config_delete)
|
||
config_ignore = config_subparsers.add_parser("ignore", help="Set ignore flag for repository entries in user config")
|
||
add_identifier_arguments(config_ignore)
|
||
config_ignore.add_argument("--set", choices=["true", "false"], required=True, help="Set ignore to true or false")
|
||
path_parser = subparsers.add_parser("path", help="Print the path(s) of repository/repositories")
|
||
add_identifier_arguments(path_parser)
|
||
explor_parser = subparsers.add_parser("explor", help="Open repository in Nautilus file manager")
|
||
add_identifier_arguments(explor_parser)
|
||
|
||
terminal_parser = subparsers.add_parser("terminal", help="Open repository in a new GNOME Terminal tab")
|
||
add_identifier_arguments(terminal_parser)
|
||
|
||
code_parser = subparsers.add_parser("code", help="Open repository workspace with VS Code")
|
||
add_identifier_arguments(code_parser)
|
||
|
||
list_parser = subparsers.add_parser("list", help="List all repositories with details and status")
|
||
list_parser.add_argument("--search", default="", help="Filter repositories that contain the given string")
|
||
list_parser.add_argument("--status", type=str, default="", help="Filter repositories by status (case insensitive)")
|
||
|
||
# Proxies the default git commands
|
||
for git_command in GIT_DEFAULT_COMMANDS:
|
||
add_identifier_arguments(
|
||
subparsers.add_parser(
|
||
git_command,
|
||
help=f"Proxies 'git {git_command}' to one repository/repositories",
|
||
description=f"Executes 'git {git_command}' for the identified repos.\nTo recieve more help execute 'git {git_command} --help'",
|
||
formatter_class=argparse.RawTextHelpFormatter
|
||
)
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Dispatch commands.
|
||
if args.command == "install":
|
||
selected = get_selected_repos(args.all,all_repos_list,args.identifiers)
|
||
install_repos(selected,repositories_base_dir, BIN_DIR, all_repos_list, args.no_verification, preview=args.preview, quiet=args.quiet)
|
||
elif args.command in GIT_DEFAULT_COMMANDS:
|
||
selected = get_selected_repos(args.all,all_repos_list,args.identifiers)
|
||
if args.command == "clone":
|
||
clone_repos(selected, repositories_base_dir, all_repos_list, args.preview)
|
||
else:
|
||
git_default_exec(selected, repositories_base_dir, all_repos_list, args.extra_args, args.command, preview=args.preview)
|
||
elif args.command == "list":
|
||
list_repositories(all_repos_list, repositories_base_dir, BIN_DIR, search_filter=args.search, status_filter=args.status)
|
||
elif args.command == "deinstall":
|
||
selected = get_selected_repos(args.all,all_repos_list,args.identifiers)
|
||
deinstall_repos(selected,repositories_base_dir, BIN_DIR, all_repos_list, preview=args.preview)
|
||
elif args.command == "delete":
|
||
selected = get_selected_repos(args.all,all_repos_list,args.identifiers)
|
||
delete_repos(selected,repositories_base_dir, all_repos_list, preview=args.preview)
|
||
elif args.command == "update":
|
||
selected = get_selected_repos(args.all,all_repos_list,args.identifiers)
|
||
update_repos(selected,repositories_base_dir, BIN_DIR, all_repos_list, args.no_verification, system_update=args.system, preview=args.preview, quiet=args.quiet)
|
||
elif args.command == "status":
|
||
selected = get_selected_repos(args.all,all_repos_list,args.identifiers)
|
||
status_repos(selected,repositories_base_dir, all_repos_list, args.extra_args, list_only=args.list, system_status=args.system, preview=args.preview)
|
||
elif args.command == "explor":
|
||
selected = get_selected_repos(args.all, all_repos_list, args.identifiers)
|
||
for repo in selected:
|
||
repo_dir = get_repo_dir(repositories_base_dir, repo)
|
||
run_command(f"nautilus {repo_dir}")
|
||
elif args.command == "code":
|
||
selected = get_selected_repos(args.all, all_repos_list, args.identifiers)
|
||
if not selected:
|
||
print("No repositories selected.")
|
||
else:
|
||
# Ermittele die Identifier aller ausgewählter Repositories.
|
||
identifiers = [get_repo_identifier(repo, all_repos_list) for repo in selected]
|
||
sorted_identifiers = sorted(identifiers)
|
||
# Erstelle den Workspace-Dateinamen, z. B.: repo1_repo2_repo3.code-workspace
|
||
workspace_name = "_".join(sorted_identifiers) + ".code-workspace"
|
||
# Hole den Workspaces-Ordner aus der Config.
|
||
workspaces_dir = os.path.expanduser(config_merged.get("directories").get("workspaces"))
|
||
os.makedirs(workspaces_dir, exist_ok=True)
|
||
workspace_file = os.path.join(workspaces_dir, workspace_name)
|
||
|
||
# Erstelle die Workspace-Konfiguration mit allen Repositories als Folder-Eintrag.
|
||
folders = []
|
||
for repo in selected:
|
||
repo_dir = os.path.expanduser(get_repo_dir(repositories_base_dir, repo))
|
||
folders.append({"path": repo_dir})
|
||
|
||
workspace_data = {
|
||
"folders": folders,
|
||
"settings": {}
|
||
}
|
||
with open(workspace_file, "w") as f:
|
||
json.dump(workspace_data, f, indent=4)
|
||
print(f"Created workspace file: {workspace_file}")
|
||
run_command(f'code "{workspace_file}"')
|
||
|
||
|
||
elif args.command == "terminal":
|
||
selected = get_selected_repos(args.all, all_repos_list, args.identifiers)
|
||
for repo in selected:
|
||
repo_dir = get_repo_dir(repositories_base_dir, repo)
|
||
run_command(f'gnome-terminal --tab --working-directory="{repo_dir}"')
|
||
|
||
elif args.command == "path":
|
||
selected = get_selected_repos(args.all,all_repos_list,args.identifiers)
|
||
paths = [
|
||
get_repo_dir(repositories_base_dir,repo)
|
||
for repo in selected
|
||
]
|
||
print(" ".join(paths))
|
||
elif args.command == "config":
|
||
if args.subcommand == "show":
|
||
if args.all or (not args.identifiers):
|
||
show_config([], full_config=True)
|
||
else:
|
||
selected = resolve_repos(args.identifiers, all_repos_list)
|
||
if selected:
|
||
show_config(selected, full_config=False)
|
||
elif args.subcommand == "add":
|
||
interactive_add(config_merged)
|
||
elif args.subcommand == "edit":
|
||
edit_config()
|
||
elif args.subcommand == "init":
|
||
if os.path.exists(USER_CONFIG_PATH):
|
||
with open(USER_CONFIG_PATH, 'r') as f:
|
||
user_config = yaml.safe_load(f) or {}
|
||
else:
|
||
user_config = {"repositories": []}
|
||
config_init(user_config, config_merged, BIN_DIR)
|
||
elif args.subcommand == "delete":
|
||
# Load user config from USER_CONFIG_PATH.
|
||
if os.path.exists(USER_CONFIG_PATH):
|
||
with open(USER_CONFIG_PATH, 'r') as f:
|
||
user_config = yaml.safe_load(f) or {"repositories": []}
|
||
else:
|
||
user_config = {"repositories": []}
|
||
if args.all or not args.identifiers:
|
||
print("You must specify identifiers to delete.")
|
||
else:
|
||
to_delete = resolve_repos(args.identifiers, user_config.get("repositories", []))
|
||
new_repos = [entry for entry in user_config.get("repositories", []) if entry not in to_delete]
|
||
user_config["repositories"] = new_repos
|
||
save_user_config(user_config)
|
||
print(f"Deleted {len(to_delete)} entries from user config.")
|
||
elif args.subcommand == "ignore":
|
||
# Load user config from USER_CONFIG_PATH.
|
||
if os.path.exists(USER_CONFIG_PATH):
|
||
with open(USER_CONFIG_PATH, 'r') as f:
|
||
user_config = yaml.safe_load(f) or {"repositories": []}
|
||
else:
|
||
user_config = {"repositories": []}
|
||
if args.all or not args.identifiers:
|
||
print("You must specify identifiers to modify ignore flag.")
|
||
else:
|
||
to_modify = resolve_repos(args.identifiers, user_config.get("repositories", []))
|
||
for entry in user_config["repositories"]:
|
||
key = (entry.get("provider"), entry.get("account"), entry.get("repository"))
|
||
for mod in to_modify:
|
||
mod_key = (mod.get("provider"), mod.get("account"), mod.get("repository"))
|
||
if key == mod_key:
|
||
entry["ignore"] = (args.set == "true")
|
||
print(f"Set ignore for {key} to {entry['ignore']}")
|
||
save_user_config(user_config)
|
||
else:
|
||
parser.print_help()
|