Optimized workspace

This commit is contained in:
Kevin Veen-Birkenbach
2025-03-05 11:20:59 +01:00
parent 486f5b486c
commit 556e0df278
2 changed files with 118 additions and 82 deletions

194
main.py
View File

@@ -6,6 +6,7 @@ import shutil
import sys
import yaml
import argparse
import json
# Define configuration file paths.
DEFAULT_CONFIG_PATH = os.path.join("config", "defaults.yaml")
@@ -31,17 +32,17 @@ def load_config():
sys.exit(1)
with open(DEFAULT_CONFIG_PATH, 'r') as f:
config = yaml.safe_load(f)
if "base" not in config or "repos" not in config:
print("Default config file must contain 'base' and 'repos' keys.")
if "directories" not in config or "repositories" not in config:
print("Default config file must contain 'directories' and 'repositories' keys.")
sys.exit(1)
if os.path.exists(USER_CONFIG_PATH):
with open(USER_CONFIG_PATH, 'r') as f:
user_config = yaml.safe_load(f)
if user_config:
if "base" in user_config:
config["base"] = user_config["base"]
if "repos" in user_config:
config["repos"].extend(user_config["repos"])
if "directories" in user_config:
config["directories"] = user_config["directories"]
if "repositories" in user_config:
config["repositories"].extend(user_config["repositories"])
return config
def save_user_config(user_config):
@@ -142,7 +143,7 @@ def generate_alias(repo, bin_dir, existing_aliases):
candidate3 = candidate3[:12]
return candidate3
def create_executable(repo, base_dir, bin_dir, all_repos, quiet=False, preview=False, no_verification=False):
def create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=False, preview=False, no_verification=False):
"""
Create an executable bash wrapper for the repository.
@@ -151,7 +152,7 @@ def create_executable(repo, base_dir, bin_dir, all_repos, quiet=False, preview=F
If an 'alias' field is provided, a symlink is created in bin_dir with that alias.
"""
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(base_dir,repo)
repo_dir = get_repo_dir(repositories_base_dir,repo)
command = repo.get("command")
if not command:
main_sh = os.path.join(repo_dir, "main.sh")
@@ -214,24 +215,24 @@ cd "{repo_dir}"
if not quiet:
print(f"Error creating alias '{alias_name}': {e}")
def install_repos(selected_repos, base_dir, bin_dir, all_repos:[], no_verification:bool, preview=False, quiet=False):
def install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, preview=False, quiet=False):
"""Install repositories by creating executable wrappers and running setup."""
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(base_dir,repo)
repo_dir = get_repo_dir(repositories_base_dir,repo)
if not os.path.exists(repo_dir):
print(f"Repository directory '{repo_dir}' does not exist. Clone it first.")
continue
create_executable(repo, base_dir, bin_dir, all_repos, quiet=quiet, preview=preview, no_verification=no_verification)
create_executable(repo, repositories_base_dir, bin_dir, all_repos, quiet=quiet, preview=preview, no_verification=no_verification)
setup_cmd = repo.get("setup")
if setup_cmd:
run_command(setup_cmd, cwd=repo_dir, preview=preview)
def exec_git_command(selected_repos, base_dir, all_repos, git_cmd, extra_args, preview=False):
def exec_git_command(selected_repos, repositories_base_dir, all_repos, git_cmd, extra_args, preview=False):
"""Execute a given git command with extra arguments for each repository."""
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(base_dir,repo)
repo_dir = get_repo_dir(repositories_base_dir,repo)
if os.path.exists(repo_dir):
full_cmd = f"git {git_cmd} {' '.join(extra_args)}"
run_command(full_cmd, cwd=repo_dir, preview=preview)
@@ -239,7 +240,7 @@ def exec_git_command(selected_repos, base_dir, all_repos, git_cmd, extra_args, p
print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.")
def status_repos(selected_repos, base_dir, all_repos, extra_args, list_only=False, system_status=False, preview=False):
def status_repos(selected_repos, repositories_base_dir, all_repos, extra_args, list_only=False, system_status=False, preview=False):
if system_status:
print("System status:")
run_command("yay -Qu", preview=preview)
@@ -247,13 +248,13 @@ def status_repos(selected_repos, base_dir, all_repos, extra_args, list_only=Fals
for repo in selected_repos:
print(get_repo_identifier(repo, all_repos))
else:
exec_git_command(selected_repos, base_dir, all_repos, "status", extra_args, preview)
exec_git_command(selected_repos, repositories_base_dir, all_repos, "status", extra_args, preview)
def get_repo_dir(base_dir:str,repo:{})->str:
def get_repo_dir(repositories_base_dir:str,repo:{})->str:
try:
return os.path.join(base_dir, repo.get("provider"), repo.get("account"), repo.get("repository"))
return os.path.join(repositories_base_dir, repo.get("provider"), repo.get("account"), repo.get("repository"))
except TypeError as e:
if base_dir:
if repositories_base_dir:
print(f"Error: {e} \nThe repository {repo} seems not correct configured.\nPlease configure it correct.")
for key in ["provider","account","repository"]:
if not repo.get(key,False):
@@ -262,10 +263,10 @@ def get_repo_dir(base_dir:str,repo:{})->str:
print(f"Error: {e} \nThe base {base} seems not correct configured.\nPlease configure it correct.")
sys.exit(1)
def clone_repos(selected_repos, base_dir:str, all_repos, preview=False):
def clone_repos(selected_repos, repositories_base_dir:str, all_repos, preview=False):
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(base_dir,repo)
repo_dir = get_repo_dir(repositories_base_dir,repo)
if os.path.exists(repo_dir):
print(f"Repository '{repo_identifier}' already exists at '{repo_dir}'.")
continue
@@ -275,7 +276,7 @@ def clone_repos(selected_repos, base_dir:str, all_repos, preview=False):
os.makedirs(parent_dir, exist_ok=True)
run_command(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, preview=preview)
def deinstall_repos(selected_repos, base_dir, bin_dir, all_repos, preview=False):
def deinstall_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, preview=False):
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
alias_path = os.path.join(bin_dir, repo_identifier)
@@ -288,14 +289,14 @@ def deinstall_repos(selected_repos, base_dir, bin_dir, all_repos, preview=False)
else:
print(f"No executable found for {repo_identifier} in {bin_dir}.")
teardown_cmd = repo.get("teardown")
repo_dir = get_repo_dir(base_dir,repo)
repo_dir = get_repo_dir(repositories_base_dir,repo)
if teardown_cmd and os.path.exists(repo_dir):
run_command(teardown_cmd, cwd=repo_dir, preview=preview)
def delete_repos(selected_repos, base_dir, all_repos, preview=False):
def delete_repos(selected_repos, repositories_base_dir, all_repos, preview=False):
for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(base_dir,repo)
repo_dir = get_repo_dir(repositories_base_dir,repo)
if os.path.exists(repo_dir):
if preview:
print(f"[Preview] Would delete directory '{repo_dir}' for {repo_identifier}.")
@@ -305,15 +306,15 @@ def delete_repos(selected_repos, base_dir, all_repos, preview=False):
else:
print(f"Repository directory '{repo_dir}' not found for {repo_identifier}.")
def update_repos(selected_repos, base_dir, bin_dir, all_repos:[], no_verification:bool, system_update=False, preview=False, quiet=False):
git_default_exec(selected_repos, base_dir, all_repos, extra_args=[],command="pull", preview=preview)
install_repos(selected_repos, base_dir, bin_dir, all_repos, no_verification, preview=preview, quiet=quiet)
def update_repos(selected_repos, repositories_base_dir, bin_dir, all_repos:[], no_verification:bool, system_update=False, preview=False, quiet=False):
git_default_exec(selected_repos, repositories_base_dir, all_repos, extra_args=[],command="pull", preview=preview)
install_repos(selected_repos, repositories_base_dir, bin_dir, all_repos, no_verification, preview=preview, quiet=quiet)
if system_update:
run_command("yay -Syu", preview=preview)
run_command("sudo pacman -Syyu", preview=preview)
def git_default_exec(selected_repos, base_dir, all_repos, extra_args, command:str, preview=False):
exec_git_command(selected_repos, base_dir, all_repos, command, extra_args, preview)
def git_default_exec(selected_repos, repositories_base_dir, all_repos, extra_args, command:str, preview=False):
exec_git_command(selected_repos, repositories_base_dir, all_repos, command, extra_args, preview)
def show_config(selected_repos, full_config=False):
"""Display configuration for one or more repositories, or the entire merged config."""
@@ -357,9 +358,9 @@ def interactive_add(config):
with open(USER_CONFIG_PATH, 'r') as f:
user_config = yaml.safe_load(f) or {}
else:
user_config = {"repos": []}
user_config.setdefault("repos", [])
user_config["repos"].append(new_entry)
user_config = {"repositories": []}
user_config.setdefault("repositories", [])
user_config["repositories"].append(new_entry)
save_user_config(user_config)
else:
print("Entry not added.")
@@ -377,22 +378,22 @@ def config_init(user_config, defaults_config, bin_dir):
- provider, account, repository from folder names.
- verified: the latest commit (via 'git log -1 --format=%H').
- alias: generated from the repository name using generate_alias().
Repositories already defined in defaults_config["repos"] or user_config["repos"] are skipped.
Repositories already defined in defaults_config["repositories"] or user_config["repositories"] are skipped.
"""
base_dir = os.path.expanduser(defaults_config["base"])
if not os.path.isdir(base_dir):
print(f"Base directory '{base_dir}' does not exist.")
repositories_base_dir = os.path.expanduser(defaults_config["directories"]["repositories"])
if not os.path.isdir(repositories_base_dir):
print(f"Base directory '{repositories_base_dir}' does not exist.")
return
default_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in defaults_config.get("repos", [])}
for entry in defaults_config.get("repositories", [])}
existing_keys = {(entry.get("provider"), entry.get("account"), entry.get("repository"))
for entry in user_config.get("repos", [])}
existing_aliases = {entry.get("alias") for entry in user_config.get("repos", []) if entry.get("alias")}
for entry in user_config.get("repositories", [])}
existing_aliases = {entry.get("alias") for entry in user_config.get("repositories", []) if entry.get("alias")}
new_entries = []
for provider in os.listdir(base_dir):
provider_path = os.path.join(base_dir, provider)
for provider in os.listdir(repositories_base_dir):
provider_path = os.path.join(repositories_base_dir, provider)
if not os.path.isdir(provider_path):
continue
for account in os.listdir(provider_path):
@@ -434,12 +435,12 @@ def config_init(user_config, defaults_config, bin_dir):
print(f"Adding new repo entry: {entry}")
if new_entries:
user_config.setdefault("repos", []).extend(new_entries)
user_config.setdefault("repositories", []).extend(new_entries)
save_user_config(user_config)
else:
print("No new repositories found.")
def get_selected_repos(show_all: bool, all_repos_list, base_dir, identifiers=None):
def get_selected_repos(show_all: bool, all_repos_list, repositories_base_dir, identifiers=None):
"""
Select repositories based on provided identifiers or, if none are given,
use the repository that matches the current working directory (if any).
@@ -458,7 +459,7 @@ def get_selected_repos(show_all: bool, all_repos_list, base_dir, identifiers=Non
cwd = os.path.abspath(os.getcwd())
selected = []
for repo in all_repos_list:
repo_dir = os.path.abspath(get_repo_dir(base_dir, repo))
repo_dir = os.path.abspath(get_repo_dir(repositories_base_dir, repo))
if cwd.startswith(repo_dir):
selected.append(repo)
if not selected:
@@ -466,13 +467,13 @@ def get_selected_repos(show_all: bool, all_repos_list, base_dir, identifiers=Non
selected = all_repos_list
return filter_ignored(selected)
def list_repositories(all_repos, base_dir, bin_dir, search_filter="", status_filter=""):
def list_repositories(all_repos, repositories_base_dir, bin_dir, search_filter="", status_filter=""):
"""
List all repositories with their attributes and status information.
Parameters:
all_repos (list): List of repository configurations.
base_dir (str): The base directory where repositories are located.
repositories_base_dir (str): The base directory where repositories are located.
bin_dir (str): The directory where executable wrappers are stored.
search_filter (str): Filter for repository attributes (case insensitive).
status_filter (str): Filter for computed status info (case insensitive).
@@ -495,7 +496,7 @@ def list_repositories(all_repos, base_dir, bin_dir, search_filter="", status_fil
"Clonable": "\033[1;37m", # White
"Ignored": "\033[38;5;208m", # Orange (extended)
"Active": "\033[38;5;129m", # Light Purple (extended)
"Installable": "\033[38;5;82m" # Light Green (extended)
"Installable": "\033[38;5;82m" # Light Green (extended)
}
for repo in all_repos:
@@ -507,7 +508,7 @@ def list_repositories(all_repos, base_dir, bin_dir, search_filter="", status_fil
# Compute status information for the repository.
identifier = get_repo_identifier(repo, all_repos)
executable_path = os.path.join(bin_dir, identifier)
repo_dir = get_repo_dir(base_dir, repo)
repo_dir = get_repo_dir(repositories_base_dir, repo)
status_list = []
# Check if the executable exists (Installed).
@@ -569,8 +570,8 @@ def list_repositories(all_repos, base_dir, bin_dir, search_filter="", status_fil
# Main program.
if __name__ == "__main__":
config_merged = load_config()
base_dir = os.path.expanduser(config_merged["base"])
all_repos_list = config_merged["repos"]
repositories_base_dir = os.path.expanduser(config_merged["directories"]["repositories"])
all_repos_list = config_merged["repositories"]
description_text = """\
\033[1;32mPackage Manager 🤖📦\033[0m
@@ -639,6 +640,9 @@ For detailed help on each command, use:
terminal_parser = subparsers.add_parser("terminal", help="Open repository in a new GNOME Terminal tab")
add_identifier_arguments(terminal_parser)
code_parser = subparsers.add_parser("code", help="Open repository workspace with VS Code")
add_identifier_arguments(code_parser)
list_parser = subparsers.add_parser("list", help="List all repositories with details and status")
list_parser.add_argument("--search", default="", help="Filter repositories that contain the given string")
@@ -654,44 +658,74 @@ For detailed help on each command, use:
# Dispatch commands.
if args.command == "install":
selected = selected = get_selected_repos(args.all, all_repos_list, base_dir, args.identifiers)
install_repos(selected, base_dir, BIN_DIR, all_repos_list, args.no_verification, preview=args.preview, quiet=args.quiet)
selected = selected = get_selected_repos(args.all, all_repos_list, repositories_base_dir, args.identifiers)
install_repos(selected, repositories_base_dir, BIN_DIR, all_repos_list, args.no_verification, preview=args.preview, quiet=args.quiet)
elif args.command in GIT_DEFAULT_COMMANDS:
selected = get_selected_repos(args.all, all_repos_list, base_dir, args.identifiers)
selected = get_selected_repos(args.all, all_repos_list, repositories_base_dir, args.identifiers)
if args.command == "clone":
clone_repos(selected, base_dir, all_repos_list, args.preview)
clone_repos(selected, repositories_base_dir, all_repos_list, args.preview)
else:
git_default_exec(selected, base_dir, all_repos_list, args.extra_args, args.command, preview=args.preview)
git_default_exec(selected, repositories_base_dir, all_repos_list, args.extra_args, args.command, preview=args.preview)
elif args.command == "list":
list_repositories(all_repos_list, base_dir, BIN_DIR, search_filter=args.search, status_filter=args.status)
list_repositories(all_repos_list, repositories_base_dir, BIN_DIR, search_filter=args.search, status_filter=args.status)
elif args.command == "deinstall":
selected = selected = get_selected_repos(args.all, all_repos_list, base_dir, args.identifiers)
deinstall_repos(selected, base_dir, BIN_DIR, all_repos_list, preview=args.preview)
selected = selected = get_selected_repos(args.all, all_repos_list, repositories_base_dir, args.identifiers)
deinstall_repos(selected, repositories_base_dir, BIN_DIR, all_repos_list, preview=args.preview)
elif args.command == "delete":
selected = selected = get_selected_repos(args.all, all_repos_list, base_dir, args.identifiers)
delete_repos(selected, base_dir, all_repos_list, preview=args.preview)
selected = selected = get_selected_repos(args.all, all_repos_list, repositories_base_dir, args.identifiers)
delete_repos(selected, repositories_base_dir, all_repos_list, preview=args.preview)
elif args.command == "update":
selected = selected = get_selected_repos(args.all, all_repos_list, base_dir, args.identifiers)
update_repos(selected, base_dir, BIN_DIR, all_repos_list, args.no_verification, system_update=args.system, preview=args.preview, quiet=args.quiet)
selected = selected = get_selected_repos(args.all, all_repos_list, repositories_base_dir, args.identifiers)
update_repos(selected, repositories_base_dir, BIN_DIR, all_repos_list, args.no_verification, system_update=args.system, preview=args.preview, quiet=args.quiet)
elif args.command == "status":
selected = selected = get_selected_repos(args.all, all_repos_list, base_dir, args.identifiers)
status_repos(selected, base_dir, all_repos_list, args.extra_args, list_only=args.list, system_status=args.system, preview=args.preview)
selected = selected = get_selected_repos(args.all, all_repos_list, repositories_base_dir, args.identifiers)
status_repos(selected, repositories_base_dir, all_repos_list, args.extra_args, list_only=args.list, system_status=args.system, preview=args.preview)
elif args.command == "explor":
selected = get_selected_repos(args.all, all_repos_list, base_dir, args.identifiers)
selected = get_selected_repos(args.all, all_repos_list, repositories_base_dir, args.identifiers)
for repo in selected:
repo_dir = get_repo_dir(base_dir, repo)
repo_dir = get_repo_dir(repositories_base_dir, repo)
run_command(f"nautilus {repo_dir}")
elif args.command == "code":
selected = get_selected_repos(args.all, all_repos_list, repositories_base_dir, args.identifiers)
if not selected:
print("No repositories selected.")
else:
# Ermittele die Identifier aller ausgewählter Repositories.
identifiers = [get_repo_identifier(repo, all_repos_list) for repo in selected]
sorted_identifiers = sorted(identifiers)
# Erstelle den Workspace-Dateinamen, z.B.: repo1_repo2_repo3.code-workspace
workspace_name = "_".join(sorted_identifiers) + ".code-workspace"
# Hole den Workspaces-Ordner aus der Config.
workspaces_dir = os.path.expanduser(config_merged.get("directories").get("workspaces"))
os.makedirs(workspaces_dir, exist_ok=True)
workspace_file = os.path.join(workspaces_dir, workspace_name)
# Erstelle die Workspace-Konfiguration mit allen Repositories als Folder-Eintrag.
folders = []
for repo in selected:
repo_dir = os.path.expanduser(get_repo_dir(repositories_base_dir, repo))
folders.append({"path": repo_dir})
workspace_data = {
"folders": folders,
"settings": {}
}
with open(workspace_file, "w") as f:
json.dump(workspace_data, f, indent=4)
print(f"Created workspace file: {workspace_file}")
run_command(f'code "{workspace_file}"')
elif args.command == "terminal":
selected = get_selected_repos(args.all, all_repos_list, base_dir, args.identifiers)
selected = get_selected_repos(args.all, all_repos_list, repositories_base_dir, args.identifiers)
for repo in selected:
repo_dir = get_repo_dir(base_dir, repo)
repo_dir = get_repo_dir(repositories_base_dir, repo)
run_command(f'gnome-terminal --tab --working-directory="{repo_dir}"')
elif args.command == "path":
selected = selected = get_selected_repos(args.all, all_repos_list, base_dir, args.identifiers)
selected = selected = get_selected_repos(args.all, all_repos_list, repositories_base_dir, args.identifiers)
paths = [
get_repo_dir(base_dir,repo)
get_repo_dir(repositories_base_dir,repo)
for repo in selected
]
print(" ".join(paths))
@@ -712,35 +746,35 @@ For detailed help on each command, use:
with open(USER_CONFIG_PATH, 'r') as f:
user_config = yaml.safe_load(f) or {}
else:
user_config = {"repos": []}
user_config = {"repositories": []}
config_init(user_config, config_merged, BIN_DIR)
elif args.subcommand == "delete":
# Load user config from USER_CONFIG_PATH.
if os.path.exists(USER_CONFIG_PATH):
with open(USER_CONFIG_PATH, 'r') as f:
user_config = yaml.safe_load(f) or {"repos": []}
user_config = yaml.safe_load(f) or {"repositories": []}
else:
user_config = {"repos": []}
user_config = {"repositories": []}
if args.all or not args.identifiers:
print("You must specify identifiers to delete.")
else:
to_delete = resolve_repos(args.identifiers, user_config.get("repos", []))
new_repos = [entry for entry in user_config.get("repos", []) if entry not in to_delete]
user_config["repos"] = new_repos
to_delete = resolve_repos(args.identifiers, user_config.get("repositories", []))
new_repos = [entry for entry in user_config.get("repositories", []) if entry not in to_delete]
user_config["repositories"] = new_repos
save_user_config(user_config)
print(f"Deleted {len(to_delete)} entries from user config.")
elif args.subcommand == "ignore":
# Load user config from USER_CONFIG_PATH.
if os.path.exists(USER_CONFIG_PATH):
with open(USER_CONFIG_PATH, 'r') as f:
user_config = yaml.safe_load(f) or {"repos": []}
user_config = yaml.safe_load(f) or {"repositories": []}
else:
user_config = {"repos": []}
user_config = {"repositories": []}
if args.all or not args.identifiers:
print("You must specify identifiers to modify ignore flag.")
else:
to_modify = resolve_repos(args.identifiers, user_config.get("repos", []))
for entry in user_config["repos"]:
to_modify = resolve_repos(args.identifiers, user_config.get("repositories", []))
for entry in user_config["repositories"]:
key = (entry.get("provider"), entry.get("account"), entry.get("repository"))
for mod in to_modify:
mod_key = (mod.get("provider"), mod.get("account"), mod.get("repository"))