* **Split mirror responsibilities into clear subcommands**
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
Some checks failed
CI / test-unit (push) Has been cancelled
CI / test-integration (push) Has been cancelled
CI / test-env-virtual (push) Has been cancelled
CI / test-env-nix (push) Has been cancelled
CI / test-e2e (push) Has been cancelled
CI / test-virgin-user (push) Has been cancelled
CI / test-virgin-root (push) Has been cancelled
CI / codesniffer-shellcheck (push) Has been cancelled
CI / codesniffer-ruff (push) Has been cancelled
Setup configures local Git state, check validates remote reachability in a read-only way, and provision explicitly creates missing remote repositories. Destructive behavior is never implicit. * **Introduce a remote provisioning layer** pkgmgr can now ensure that repositories exist on remote providers. If a repository is missing, it can be created automatically on supported platforms when explicitly requested. * **Add a provider registry for extensibility** Providers are resolved based on the remote host, with optional hints to force a specific backend. This makes it straightforward to add further providers later without changing the core logic. * **Use a lightweight, dependency-free HTTP client** All API communication is handled via a small stdlib-based client. HTTP errors are mapped to meaningful domain errors, improving diagnostics and error handling consistency. * **Centralize credential resolution** API tokens are resolved in a strict order: environment variables first, then the system keyring, and finally an interactive prompt if allowed. This works well for both CI and interactive use. * **Keep keyring integration optional** Secure token storage via the OS keyring is provided as an optional dependency. If unavailable, pkgmgr still works using environment variables or one-off interactive tokens. * **Improve CLI parser safety and clarity** Shared argument helpers now guard against duplicate definitions, making composed subcommands more robust and easier to maintain. * **Expand end-to-end test coverage** All mirror-related workflows are exercised through real CLI invocations in preview mode, ensuring full wiring correctness while remaining safe for automated test environments. https://chatgpt.com/share/693df441-a780-800f-bcf7-96e06cc9e421
This commit is contained in:
@@ -27,8 +27,8 @@ Homepage = "https://s.veen.world/pkgmgr"
|
||||
Source = "https://github.com/kevinveenbirkenbach/package-manager"
|
||||
|
||||
[project.optional-dependencies]
|
||||
keyring = ["keyring>=24.0.0"]
|
||||
dev = [
|
||||
"pytest",
|
||||
"mypy"
|
||||
]
|
||||
|
||||
|
||||
@@ -1,14 +1,121 @@
|
||||
# src/pkgmgr/actions/mirror/setup_cmd.py
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List, Tuple
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from pkgmgr.core.git import run_git, GitError
|
||||
from pkgmgr.core.git import GitError, run_git
|
||||
from pkgmgr.core.remote_provisioning import ProviderHint, RepoSpec, ensure_remote_repo
|
||||
from pkgmgr.core.remote_provisioning.ensure import EnsureOptions
|
||||
|
||||
from .context import build_context
|
||||
from .git_remote import determine_primary_remote_url, ensure_origin_remote
|
||||
from .types import Repository
|
||||
|
||||
|
||||
def _probe_mirror(url: str, repo_dir: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
Probe a remote mirror URL using `git ls-remote`.
|
||||
|
||||
Returns:
|
||||
(True, "") on success,
|
||||
(False, error_message) on failure.
|
||||
"""
|
||||
try:
|
||||
run_git(["ls-remote", url], cwd=repo_dir)
|
||||
return True, ""
|
||||
except GitError as exc:
|
||||
return False, str(exc)
|
||||
|
||||
|
||||
def _host_from_git_url(url: str) -> str:
|
||||
url = (url or "").strip()
|
||||
if not url:
|
||||
return ""
|
||||
|
||||
if "://" in url:
|
||||
parsed = urlparse(url)
|
||||
netloc = (parsed.netloc or "").strip()
|
||||
if "@" in netloc:
|
||||
netloc = netloc.split("@", 1)[1]
|
||||
# keep optional :port
|
||||
return netloc
|
||||
|
||||
# scp-like: git@host:owner/repo.git
|
||||
if "@" in url and ":" in url:
|
||||
after_at = url.split("@", 1)[1]
|
||||
host = after_at.split(":", 1)[0]
|
||||
return host.strip()
|
||||
|
||||
return url.split("/", 1)[0].strip()
|
||||
|
||||
def _ensure_remote_repository(
|
||||
repo: Repository,
|
||||
repositories_base_dir: str,
|
||||
all_repos: List[Repository],
|
||||
preview: bool,
|
||||
) -> None:
|
||||
"""
|
||||
Ensure that the remote repository exists using provider APIs.
|
||||
|
||||
This is ONLY called when ensure_remote=True.
|
||||
"""
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
resolved_mirrors = ctx.resolved_mirrors
|
||||
|
||||
primary_url = determine_primary_remote_url(repo, resolved_mirrors)
|
||||
if not primary_url:
|
||||
print("[INFO] No remote URL could be derived; skipping remote provisioning.")
|
||||
return
|
||||
|
||||
# IMPORTANT:
|
||||
# - repo["provider"] is typically a provider *kind* (e.g. "github" / "gitea"),
|
||||
# NOT a hostname. We derive the actual host from the remote URL.
|
||||
host = _host_from_git_url(primary_url)
|
||||
owner = repo.get("account")
|
||||
name = repo.get("repository")
|
||||
|
||||
if not host or not owner or not name:
|
||||
print("[WARN] Missing host/account/repository; cannot ensure remote repo.")
|
||||
print(f" host={host!r}, account={owner!r}, repository={name!r}")
|
||||
return
|
||||
|
||||
print("------------------------------------------------------------")
|
||||
print(f"[REMOTE ENSURE] {ctx.identifier}")
|
||||
print(f"[REMOTE ENSURE] host: {host}")
|
||||
print("------------------------------------------------------------")
|
||||
|
||||
spec = RepoSpec(
|
||||
host=str(host),
|
||||
owner=str(owner),
|
||||
name=str(name),
|
||||
private=bool(repo.get("private", True)),
|
||||
description=str(repo.get("description", "")),
|
||||
)
|
||||
|
||||
provider_kind = str(repo.get("provider", "")).strip().lower() or None
|
||||
|
||||
try:
|
||||
result = ensure_remote_repo(
|
||||
spec,
|
||||
provider_hint=ProviderHint(kind=provider_kind),
|
||||
options=EnsureOptions(
|
||||
preview=preview,
|
||||
interactive=True,
|
||||
allow_prompt=True,
|
||||
save_prompt_token_to_keyring=True,
|
||||
),
|
||||
)
|
||||
print(f"[REMOTE ENSURE] {result.status.upper()}: {result.message}")
|
||||
if result.url:
|
||||
print(f"[REMOTE ENSURE] URL: {result.url}")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
# Keep action layer resilient
|
||||
print(f"[ERROR] Remote provisioning failed: {exc}")
|
||||
|
||||
print()
|
||||
|
||||
|
||||
def _setup_local_mirrors_for_repo(
|
||||
repo: Repository,
|
||||
repositories_base_dir: str,
|
||||
@@ -16,7 +123,8 @@ def _setup_local_mirrors_for_repo(
|
||||
preview: bool,
|
||||
) -> None:
|
||||
"""
|
||||
Ensure local Git state is sane (currently: 'origin' remote).
|
||||
Local setup:
|
||||
- Ensure 'origin' remote exists and is sane
|
||||
"""
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
|
||||
@@ -29,103 +137,68 @@ def _setup_local_mirrors_for_repo(
|
||||
print()
|
||||
|
||||
|
||||
def _probe_mirror(url: str, repo_dir: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
Probe a remote mirror by running `git ls-remote <url>`.
|
||||
|
||||
Returns:
|
||||
(True, "") on success,
|
||||
(False, error_message) on failure.
|
||||
|
||||
Wichtig:
|
||||
- Wir werten ausschließlich den Exit-Code aus.
|
||||
- STDERR kann Hinweise/Warnings enthalten und ist NICHT automatisch ein Fehler.
|
||||
"""
|
||||
try:
|
||||
# Wir ignorieren stdout komplett; wichtig ist nur, dass der Befehl ohne
|
||||
# GitError (also Exit-Code 0) durchläuft.
|
||||
run_git(["ls-remote", url], cwd=repo_dir)
|
||||
return True, ""
|
||||
except GitError as exc:
|
||||
return False, str(exc)
|
||||
|
||||
|
||||
def _setup_remote_mirrors_for_repo(
|
||||
repo: Repository,
|
||||
repositories_base_dir: str,
|
||||
all_repos: List[Repository],
|
||||
preview: bool,
|
||||
ensure_remote: bool,
|
||||
) -> None:
|
||||
"""
|
||||
Remote-side setup / validation.
|
||||
|
||||
Aktuell werden nur **nicht-destruktive Checks** gemacht:
|
||||
Default behavior:
|
||||
- Non-destructive checks using `git ls-remote`.
|
||||
|
||||
- Für jeden Mirror (aus config + MIRRORS-Datei, file gewinnt):
|
||||
* `git ls-remote <url>` wird ausgeführt.
|
||||
* Bei Exit-Code 0 → [OK]
|
||||
* Bei Fehler → [WARN] + Details aus der GitError-Exception
|
||||
|
||||
Es werden **keine** Provider-APIs aufgerufen und keine Repos angelegt.
|
||||
Optional behavior:
|
||||
- If ensure_remote=True:
|
||||
* Attempt to create missing repositories via provider API
|
||||
* Uses TokenResolver (ENV -> keyring -> prompt)
|
||||
"""
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
resolved_m = ctx.resolved_mirrors
|
||||
resolved_mirrors = ctx.resolved_mirrors
|
||||
|
||||
print("------------------------------------------------------------")
|
||||
print(f"[MIRROR SETUP:REMOTE] {ctx.identifier}")
|
||||
print(f"[MIRROR SETUP:REMOTE] dir: {ctx.repo_dir}")
|
||||
print("------------------------------------------------------------")
|
||||
|
||||
if not resolved_m:
|
||||
# Optional: Fallback auf eine heuristisch bestimmte URL, falls wir
|
||||
# irgendwann "automatisch anlegen" implementieren wollen.
|
||||
primary_url = determine_primary_remote_url(repo, resolved_m)
|
||||
if ensure_remote:
|
||||
_ensure_remote_repository(
|
||||
repo,
|
||||
repositories_base_dir=repositories_base_dir,
|
||||
all_repos=all_repos,
|
||||
preview=preview,
|
||||
)
|
||||
|
||||
if not resolved_mirrors:
|
||||
primary_url = determine_primary_remote_url(repo, resolved_mirrors)
|
||||
if not primary_url:
|
||||
print(
|
||||
"[INFO] No mirrors configured (config or MIRRORS file), and no "
|
||||
"primary URL could be derived from provider/account/repository."
|
||||
)
|
||||
print("[INFO] No mirrors configured and no primary URL available.")
|
||||
print()
|
||||
return
|
||||
|
||||
ok, error_message = _probe_mirror(primary_url, ctx.repo_dir)
|
||||
if ok:
|
||||
print(f"[OK] Remote mirror (primary) is reachable: {primary_url}")
|
||||
print(f"[OK] primary: {primary_url}")
|
||||
else:
|
||||
print("[WARN] Primary remote URL is NOT reachable:")
|
||||
print(f" {primary_url}")
|
||||
if error_message:
|
||||
print(" Details:")
|
||||
for line in error_message.splitlines():
|
||||
print(f" {line}")
|
||||
print(f"[WARN] primary: {primary_url}")
|
||||
for line in error_message.splitlines():
|
||||
print(f" {line}")
|
||||
|
||||
print()
|
||||
print(
|
||||
"[INFO] Remote checks are non-destructive and only use `git ls-remote` "
|
||||
"to probe mirror URLs."
|
||||
)
|
||||
print()
|
||||
return
|
||||
|
||||
# Normaler Fall: wir haben benannte Mirrors aus config/MIRRORS
|
||||
for name, url in sorted(resolved_m.items()):
|
||||
for name, url in sorted(resolved_mirrors.items()):
|
||||
ok, error_message = _probe_mirror(url, ctx.repo_dir)
|
||||
if ok:
|
||||
print(f"[OK] Remote mirror '{name}' is reachable: {url}")
|
||||
print(f"[OK] {name}: {url}")
|
||||
else:
|
||||
print(f"[WARN] Remote mirror '{name}' is NOT reachable:")
|
||||
print(f" {url}")
|
||||
if error_message:
|
||||
print(" Details:")
|
||||
for line in error_message.splitlines():
|
||||
print(f" {line}")
|
||||
print(f"[WARN] {name}: {url}")
|
||||
for line in error_message.splitlines():
|
||||
print(f" {line}")
|
||||
|
||||
print()
|
||||
print(
|
||||
"[INFO] Remote checks are non-destructive and only use `git ls-remote` "
|
||||
"to probe mirror URLs."
|
||||
)
|
||||
print()
|
||||
|
||||
|
||||
def setup_mirrors(
|
||||
@@ -135,22 +208,25 @@ def setup_mirrors(
|
||||
preview: bool = False,
|
||||
local: bool = True,
|
||||
remote: bool = True,
|
||||
ensure_remote: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Setup mirrors for the selected repositories.
|
||||
|
||||
local:
|
||||
- Configure local Git remotes (currently: ensure 'origin' is present and
|
||||
points to a reasonable URL).
|
||||
- Configure local Git remotes (ensure 'origin' exists).
|
||||
|
||||
remote:
|
||||
- Non-destructive remote checks using `git ls-remote` for each mirror URL.
|
||||
Es werden keine Repositories auf dem Provider angelegt.
|
||||
- Non-destructive remote checks using `git ls-remote`.
|
||||
|
||||
ensure_remote:
|
||||
- If True, attempt to create missing remote repositories via provider APIs.
|
||||
- This is explicit and NEVER enabled implicitly.
|
||||
"""
|
||||
for repo in selected_repos:
|
||||
if local:
|
||||
_setup_local_mirrors_for_repo(
|
||||
repo,
|
||||
repo=repo,
|
||||
repositories_base_dir=repositories_base_dir,
|
||||
all_repos=all_repos,
|
||||
preview=preview,
|
||||
@@ -158,8 +234,9 @@ def setup_mirrors(
|
||||
|
||||
if remote:
|
||||
_setup_remote_mirrors_for_repo(
|
||||
repo,
|
||||
repo=repo,
|
||||
repositories_base_dir=repositories_base_dir,
|
||||
all_repos=all_repos,
|
||||
preview=preview,
|
||||
ensure_remote=ensure_remote,
|
||||
)
|
||||
|
||||
@@ -1,32 +1,30 @@
|
||||
# src/pkgmgr/cli/commands/mirror.py
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from pkgmgr.actions.mirror import (
|
||||
diff_mirrors,
|
||||
list_mirrors,
|
||||
merge_mirrors,
|
||||
setup_mirrors,
|
||||
)
|
||||
from pkgmgr.actions.mirror import diff_mirrors, list_mirrors, merge_mirrors, setup_mirrors
|
||||
from pkgmgr.cli.context import CLIContext
|
||||
|
||||
Repository = Dict[str, Any]
|
||||
|
||||
|
||||
def handle_mirror_command(
|
||||
args,
|
||||
ctx: CLIContext,
|
||||
args: Any,
|
||||
selected: List[Repository],
|
||||
) -> None:
|
||||
"""
|
||||
Entry point for 'pkgmgr mirror' subcommands.
|
||||
|
||||
Subcommands:
|
||||
- mirror list → list configured mirrors
|
||||
- mirror diff → compare config vs MIRRORS file
|
||||
- mirror merge → merge mirrors between config and MIRRORS file
|
||||
- mirror setup → configure local Git + remote placeholders
|
||||
- mirror list
|
||||
- mirror diff
|
||||
- mirror merge
|
||||
- mirror setup
|
||||
- mirror check
|
||||
- mirror provision
|
||||
"""
|
||||
if not selected:
|
||||
print("[INFO] No repositories selected for 'mirror' command.")
|
||||
@@ -34,9 +32,6 @@ def handle_mirror_command(
|
||||
|
||||
subcommand = getattr(args, "subcommand", None)
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# mirror list
|
||||
# ------------------------------------------------------------
|
||||
if subcommand == "list":
|
||||
source = getattr(args, "source", "all")
|
||||
list_mirrors(
|
||||
@@ -47,9 +42,6 @@ def handle_mirror_command(
|
||||
)
|
||||
return
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# mirror diff
|
||||
# ------------------------------------------------------------
|
||||
if subcommand == "diff":
|
||||
diff_mirrors(
|
||||
selected_repos=selected,
|
||||
@@ -58,27 +50,17 @@ def handle_mirror_command(
|
||||
)
|
||||
return
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# mirror merge
|
||||
# ------------------------------------------------------------
|
||||
if subcommand == "merge":
|
||||
source = getattr(args, "source", None)
|
||||
target = getattr(args, "target", None)
|
||||
preview = getattr(args, "preview", False)
|
||||
|
||||
if source == target:
|
||||
print(
|
||||
"[ERROR] For 'mirror merge', source and target "
|
||||
"must differ (one of: config, file)."
|
||||
)
|
||||
print("[ERROR] For 'mirror merge', source and target must differ (config vs file).")
|
||||
sys.exit(2)
|
||||
|
||||
# Config file path can be passed explicitly via --config-path.
|
||||
# If not given, fall back to the global context (if available).
|
||||
explicit_config_path = getattr(args, "config_path", None)
|
||||
user_config_path = explicit_config_path or getattr(
|
||||
ctx, "user_config_path", None
|
||||
)
|
||||
user_config_path = explicit_config_path or getattr(ctx, "user_config_path", None)
|
||||
|
||||
merge_mirrors(
|
||||
selected_repos=selected,
|
||||
@@ -91,26 +73,42 @@ def handle_mirror_command(
|
||||
)
|
||||
return
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# mirror setup
|
||||
# ------------------------------------------------------------
|
||||
if subcommand == "setup":
|
||||
local = getattr(args, "local", False)
|
||||
remote = getattr(args, "remote", False)
|
||||
preview = getattr(args, "preview", False)
|
||||
|
||||
# If neither flag is set → default to both.
|
||||
if not local and not remote:
|
||||
local = True
|
||||
remote = True
|
||||
|
||||
setup_mirrors(
|
||||
selected_repos=selected,
|
||||
repositories_base_dir=ctx.repositories_base_dir,
|
||||
all_repos=ctx.all_repositories,
|
||||
preview=preview,
|
||||
local=local,
|
||||
remote=remote,
|
||||
local=True,
|
||||
remote=False,
|
||||
ensure_remote=False,
|
||||
)
|
||||
return
|
||||
|
||||
if subcommand == "check":
|
||||
preview = getattr(args, "preview", False)
|
||||
setup_mirrors(
|
||||
selected_repos=selected,
|
||||
repositories_base_dir=ctx.repositories_base_dir,
|
||||
all_repos=ctx.all_repositories,
|
||||
preview=preview,
|
||||
local=False,
|
||||
remote=True,
|
||||
ensure_remote=False,
|
||||
)
|
||||
return
|
||||
|
||||
if subcommand == "provision":
|
||||
preview = getattr(args, "preview", False)
|
||||
setup_mirrors(
|
||||
selected_repos=selected,
|
||||
repositories_base_dir=ctx.repositories_base_dir,
|
||||
all_repos=ctx.all_repositories,
|
||||
preview=preview,
|
||||
local=False,
|
||||
remote=True,
|
||||
ensure_remote=True,
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
@@ -1,96 +1,134 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# src/pkgmgr/cli/parser/common.py
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
class SortedSubParsersAction(argparse._SubParsersAction):
|
||||
"""
|
||||
Subparsers action that keeps choices sorted alphabetically.
|
||||
Subparsers action that keeps subcommands sorted alphabetically.
|
||||
"""
|
||||
|
||||
def add_parser(self, name, **kwargs):
|
||||
parser = super().add_parser(name, **kwargs)
|
||||
# Sort choices alphabetically by dest (subcommand name)
|
||||
self._choices_actions.sort(key=lambda a: a.dest)
|
||||
return parser
|
||||
|
||||
|
||||
def _has_action(
|
||||
parser: argparse.ArgumentParser,
|
||||
*,
|
||||
positional: Optional[str] = None,
|
||||
options: Tuple[str, ...] = (),
|
||||
) -> bool:
|
||||
"""
|
||||
Check whether the parser already has an action.
|
||||
|
||||
- positional: name of a positional argument (e.g. "identifiers")
|
||||
- options: option strings (e.g. "--preview", "-q")
|
||||
"""
|
||||
for action in parser._actions:
|
||||
if positional and action.dest == positional:
|
||||
return True
|
||||
if options and any(opt in action.option_strings for opt in options):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _add_positional_if_missing(
|
||||
parser: argparse.ArgumentParser,
|
||||
name: str,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""Safely add a positional argument."""
|
||||
if _has_action(parser, positional=name):
|
||||
return
|
||||
parser.add_argument(name, **kwargs)
|
||||
|
||||
|
||||
def _add_option_if_missing(
|
||||
parser: argparse.ArgumentParser,
|
||||
*option_strings: str,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""Safely add an optional argument."""
|
||||
if _has_action(parser, options=tuple(option_strings)):
|
||||
return
|
||||
parser.add_argument(*option_strings, **kwargs)
|
||||
|
||||
|
||||
def add_identifier_arguments(subparser: argparse.ArgumentParser) -> None:
|
||||
"""
|
||||
Common identifier / selection arguments for many subcommands.
|
||||
|
||||
Selection modes (mutual intent, not hard-enforced):
|
||||
- identifiers (positional): select by alias / provider/account/repo
|
||||
- --all: select all repositories
|
||||
- --category / --string / --tag: filter-based selection on top
|
||||
of the full repository set
|
||||
"""
|
||||
subparser.add_argument(
|
||||
_add_positional_if_missing(
|
||||
subparser,
|
||||
"identifiers",
|
||||
nargs="*",
|
||||
help=(
|
||||
"Identifier(s) for repositories. "
|
||||
"Default: Repository of current folder."
|
||||
"Default: repository of the current working directory."
|
||||
),
|
||||
)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"--all",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help=(
|
||||
"Apply the subcommand to all repositories in the config. "
|
||||
"Some subcommands ask for confirmation. If you want to give this "
|
||||
"confirmation for all repositories, pipe 'yes'. E.g: "
|
||||
"yes | pkgmgr {subcommand} --all"
|
||||
"Pipe 'yes' to auto-confirm. Example:\n"
|
||||
" yes | pkgmgr <command> --all"
|
||||
),
|
||||
)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"--category",
|
||||
nargs="+",
|
||||
default=[],
|
||||
help=(
|
||||
"Filter repositories by category patterns derived from config "
|
||||
"filenames or repo metadata (use filename without .yml/.yaml, "
|
||||
"or /regex/ to use a regular expression)."
|
||||
),
|
||||
help="Filter repositories by category (supports /regex/).",
|
||||
)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"--string",
|
||||
default="",
|
||||
help=(
|
||||
"Filter repositories whose identifier / name / path contains this "
|
||||
"substring (case-insensitive). Use /regex/ for regular expressions."
|
||||
),
|
||||
help="Filter repositories by substring or /regex/.",
|
||||
)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"--tag",
|
||||
action="append",
|
||||
default=[],
|
||||
help=(
|
||||
"Filter repositories by tag. Matches tags from the repository "
|
||||
"collector and category tags. Use /regex/ for regular expressions."
|
||||
),
|
||||
help="Filter repositories by tag (supports /regex/).",
|
||||
)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"--preview",
|
||||
action="store_true",
|
||||
help="Preview changes without executing commands",
|
||||
help="Preview changes without executing commands.",
|
||||
)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"--list",
|
||||
action="store_true",
|
||||
help="List affected repositories (with preview or status)",
|
||||
help="List affected repositories.",
|
||||
)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"-a",
|
||||
"--args",
|
||||
nargs=argparse.REMAINDER,
|
||||
dest="extra_args",
|
||||
help="Additional parameters to be attached.",
|
||||
nargs=argparse.REMAINDER,
|
||||
default=[],
|
||||
help="Additional parameters to be attached.",
|
||||
)
|
||||
|
||||
|
||||
@@ -99,29 +137,34 @@ def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None:
|
||||
Common arguments for install/update commands.
|
||||
"""
|
||||
add_identifier_arguments(subparser)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"-q",
|
||||
"--quiet",
|
||||
action="store_true",
|
||||
help="Suppress warnings and info messages",
|
||||
help="Suppress warnings and info messages.",
|
||||
)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"--no-verification",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Disable verification via commit/gpg",
|
||||
help="Disable verification via commit / GPG.",
|
||||
)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"--dependencies",
|
||||
action="store_true",
|
||||
help="Also pull and update dependencies",
|
||||
help="Also pull and update dependencies.",
|
||||
)
|
||||
subparser.add_argument(
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"--clone-mode",
|
||||
choices=["ssh", "https", "shallow"],
|
||||
default="ssh",
|
||||
help=(
|
||||
"Specify the clone mode: ssh, https, or shallow "
|
||||
"(HTTPS shallow clone; default: ssh)"
|
||||
),
|
||||
help="Specify clone mode (default: ssh).",
|
||||
)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
# src/pkgmgr/cli/parser/mirror_cmd.py
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
@@ -8,103 +9,55 @@ import argparse
|
||||
from .common import add_identifier_arguments
|
||||
|
||||
|
||||
def add_mirror_subparsers(
|
||||
subparsers: argparse._SubParsersAction,
|
||||
) -> None:
|
||||
"""
|
||||
Register mirror command and its subcommands (list, diff, merge, setup).
|
||||
"""
|
||||
def add_mirror_subparsers(subparsers: argparse._SubParsersAction) -> None:
|
||||
mirror_parser = subparsers.add_parser(
|
||||
"mirror",
|
||||
help="Mirror-related utilities (list, diff, merge, setup)",
|
||||
help="Mirror-related utilities (list, diff, merge, setup, check, provision)",
|
||||
)
|
||||
mirror_subparsers = mirror_parser.add_subparsers(
|
||||
dest="subcommand",
|
||||
help="Mirror subcommands",
|
||||
metavar="SUBCOMMAND",
|
||||
required=True,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# mirror list
|
||||
# ------------------------------------------------------------------
|
||||
mirror_list = mirror_subparsers.add_parser(
|
||||
"list",
|
||||
help="List configured mirrors for repositories",
|
||||
)
|
||||
mirror_list = mirror_subparsers.add_parser("list", help="List configured mirrors for repositories")
|
||||
add_identifier_arguments(mirror_list)
|
||||
mirror_list.add_argument(
|
||||
"--source",
|
||||
choices=["all", "config", "file", "resolved"],
|
||||
choices=["config", "file", "all"],
|
||||
default="all",
|
||||
help="Which mirror source to show.",
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# mirror diff
|
||||
# ------------------------------------------------------------------
|
||||
mirror_diff = mirror_subparsers.add_parser(
|
||||
"diff",
|
||||
help="Show differences between config mirrors and MIRRORS file",
|
||||
)
|
||||
mirror_diff = mirror_subparsers.add_parser("diff", help="Show differences between config mirrors and MIRRORS file")
|
||||
add_identifier_arguments(mirror_diff)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# mirror merge {config,file} {config,file}
|
||||
# ------------------------------------------------------------------
|
||||
mirror_merge = mirror_subparsers.add_parser(
|
||||
"merge",
|
||||
help=(
|
||||
"Merge mirrors between config and MIRRORS file "
|
||||
"(example: pkgmgr mirror merge config file --all)"
|
||||
),
|
||||
help="Merge mirrors between config and MIRRORS file (example: pkgmgr mirror merge config file --all)",
|
||||
)
|
||||
|
||||
# First define merge direction positionals, then selection args.
|
||||
mirror_merge.add_argument(
|
||||
"source",
|
||||
choices=["config", "file"],
|
||||
help="Source of mirrors.",
|
||||
)
|
||||
mirror_merge.add_argument(
|
||||
"target",
|
||||
choices=["config", "file"],
|
||||
help="Target of mirrors.",
|
||||
)
|
||||
|
||||
# Selection / filter / preview arguments
|
||||
mirror_merge.add_argument("source", choices=["config", "file"], help="Source of mirrors.")
|
||||
mirror_merge.add_argument("target", choices=["config", "file"], help="Target of mirrors.")
|
||||
add_identifier_arguments(mirror_merge)
|
||||
|
||||
mirror_merge.add_argument(
|
||||
"--config-path",
|
||||
help=(
|
||||
"Path to the user config file to update. "
|
||||
"If omitted, the global config path is used."
|
||||
),
|
||||
help="Path to the user config file to update. If omitted, the global config path is used.",
|
||||
)
|
||||
# Note: --preview, --all, --category, --tag, --list, etc. are provided
|
||||
# by add_identifier_arguments().
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# mirror setup
|
||||
# ------------------------------------------------------------------
|
||||
mirror_setup = mirror_subparsers.add_parser(
|
||||
"setup",
|
||||
help=(
|
||||
"Setup mirror configuration for repositories.\n"
|
||||
" --local → configure local Git (remotes, pushurls)\n"
|
||||
" --remote → create remote repositories if missing\n"
|
||||
"Default: both local and remote."
|
||||
),
|
||||
help="Configure local Git remotes and push URLs (origin, pushurl list).",
|
||||
)
|
||||
add_identifier_arguments(mirror_setup)
|
||||
mirror_setup.add_argument(
|
||||
"--local",
|
||||
action="store_true",
|
||||
help="Only configure the local Git repository.",
|
||||
|
||||
mirror_check = mirror_subparsers.add_parser(
|
||||
"check",
|
||||
help="Check remote mirror reachability (git ls-remote). Read-only.",
|
||||
)
|
||||
mirror_setup.add_argument(
|
||||
"--remote",
|
||||
action="store_true",
|
||||
help="Only operate on remote repositories.",
|
||||
add_identifier_arguments(mirror_check)
|
||||
|
||||
mirror_provision = mirror_subparsers.add_parser(
|
||||
"provision",
|
||||
help="Provision remote repositories via provider APIs (create missing repos).",
|
||||
)
|
||||
# Note: --preview also comes from add_identifier_arguments().
|
||||
add_identifier_arguments(mirror_provision)
|
||||
|
||||
21
src/pkgmgr/core/credentials/__init__.py
Normal file
21
src/pkgmgr/core/credentials/__init__.py
Normal file
@@ -0,0 +1,21 @@
|
||||
# src/pkgmgr/core/credentials/__init__.py
|
||||
"""Credential resolution for provider APIs."""
|
||||
|
||||
from .resolver import ResolutionOptions, TokenResolver
|
||||
from .types import (
|
||||
CredentialError,
|
||||
KeyringUnavailableError,
|
||||
NoCredentialsError,
|
||||
TokenRequest,
|
||||
TokenResult,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"TokenResolver",
|
||||
"ResolutionOptions",
|
||||
"CredentialError",
|
||||
"NoCredentialsError",
|
||||
"KeyringUnavailableError",
|
||||
"TokenRequest",
|
||||
"TokenResult",
|
||||
]
|
||||
11
src/pkgmgr/core/credentials/providers/__init__.py
Normal file
11
src/pkgmgr/core/credentials/providers/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""Credential providers used by TokenResolver."""
|
||||
|
||||
from .env import EnvTokenProvider
|
||||
from .keyring import KeyringTokenProvider
|
||||
from .prompt import PromptTokenProvider
|
||||
|
||||
__all__ = [
|
||||
"EnvTokenProvider",
|
||||
"KeyringTokenProvider",
|
||||
"PromptTokenProvider",
|
||||
]
|
||||
23
src/pkgmgr/core/credentials/providers/env.py
Normal file
23
src/pkgmgr/core/credentials/providers/env.py
Normal file
@@ -0,0 +1,23 @@
|
||||
# src/pkgmgr/core/credentials/providers/env.py
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from ..store_keys import env_var_candidates
|
||||
from ..types import TokenRequest, TokenResult
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EnvTokenProvider:
|
||||
"""Resolve tokens from environment variables."""
|
||||
|
||||
source_name: str = "env"
|
||||
|
||||
def get(self, request: TokenRequest) -> Optional[TokenResult]:
|
||||
for key in env_var_candidates(request.provider_kind, request.host, request.owner):
|
||||
val = os.environ.get(key)
|
||||
if val:
|
||||
return TokenResult(token=val.strip(), source=self.source_name)
|
||||
return None
|
||||
39
src/pkgmgr/core/credentials/providers/keyring.py
Normal file
39
src/pkgmgr/core/credentials/providers/keyring.py
Normal file
@@ -0,0 +1,39 @@
|
||||
# src/pkgmgr/core/credentials/providers/keyring.py
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from ..store_keys import build_keyring_key
|
||||
from ..types import KeyringUnavailableError, TokenRequest, TokenResult
|
||||
|
||||
|
||||
def _import_keyring():
|
||||
try:
|
||||
import keyring # type: ignore
|
||||
|
||||
return keyring
|
||||
except Exception as exc: # noqa: BLE001
|
||||
raise KeyringUnavailableError(
|
||||
"python-keyring is not available or no backend is configured."
|
||||
) from exc
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class KeyringTokenProvider:
|
||||
"""Resolve/store tokens from/to OS keyring via python-keyring."""
|
||||
|
||||
source_name: str = "keyring"
|
||||
|
||||
def get(self, request: TokenRequest) -> Optional[TokenResult]:
|
||||
keyring = _import_keyring()
|
||||
key = build_keyring_key(request.provider_kind, request.host, request.owner)
|
||||
token = keyring.get_password(key.service, key.username)
|
||||
if token:
|
||||
return TokenResult(token=token.strip(), source=self.source_name)
|
||||
return None
|
||||
|
||||
def set(self, request: TokenRequest, token: str) -> None:
|
||||
keyring = _import_keyring()
|
||||
key = build_keyring_key(request.provider_kind, request.host, request.owner)
|
||||
keyring.set_password(key.service, key.username, token)
|
||||
32
src/pkgmgr/core/credentials/providers/prompt.py
Normal file
32
src/pkgmgr/core/credentials/providers/prompt.py
Normal file
@@ -0,0 +1,32 @@
|
||||
# src/pkgmgr/core/credentials/providers/prompt.py
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from getpass import getpass
|
||||
from typing import Optional
|
||||
|
||||
from ..types import TokenRequest, TokenResult
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PromptTokenProvider:
|
||||
"""Interactively prompt for a token.
|
||||
|
||||
Only used when:
|
||||
- interactive mode is enabled
|
||||
- stdin is a TTY
|
||||
"""
|
||||
|
||||
source_name: str = "prompt"
|
||||
|
||||
def get(self, request: TokenRequest) -> Optional[TokenResult]:
|
||||
if not sys.stdin.isatty():
|
||||
return None
|
||||
|
||||
owner_info = f" (owner: {request.owner})" if request.owner else ""
|
||||
prompt = f"Enter API token for {request.provider_kind} on {request.host}{owner_info}: "
|
||||
token = (getpass(prompt) or "").strip()
|
||||
if not token:
|
||||
return None
|
||||
return TokenResult(token=token, source=self.source_name)
|
||||
71
src/pkgmgr/core/credentials/resolver.py
Normal file
71
src/pkgmgr/core/credentials/resolver.py
Normal file
@@ -0,0 +1,71 @@
|
||||
# src/pkgmgr/core/credentials/resolver.py
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from .providers.env import EnvTokenProvider
|
||||
from .providers.keyring import KeyringTokenProvider
|
||||
from .providers.prompt import PromptTokenProvider
|
||||
from .types import NoCredentialsError, TokenRequest, TokenResult
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ResolutionOptions:
|
||||
"""Controls token resolution behavior."""
|
||||
|
||||
interactive: bool = True
|
||||
allow_prompt: bool = True
|
||||
save_prompt_token_to_keyring: bool = True
|
||||
|
||||
|
||||
class TokenResolver:
|
||||
"""Resolve tokens from multiple sources (ENV -> Keyring -> Prompt)."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._env = EnvTokenProvider()
|
||||
self._keyring = KeyringTokenProvider()
|
||||
self._prompt = PromptTokenProvider()
|
||||
|
||||
def get_token(
|
||||
self,
|
||||
provider_kind: str,
|
||||
host: str,
|
||||
owner: Optional[str] = None,
|
||||
options: Optional[ResolutionOptions] = None,
|
||||
) -> TokenResult:
|
||||
opts = options or ResolutionOptions()
|
||||
request = TokenRequest(provider_kind=provider_kind, host=host, owner=owner)
|
||||
|
||||
# 1) ENV
|
||||
env_res = self._env.get(request)
|
||||
if env_res:
|
||||
return env_res
|
||||
|
||||
# 2) Keyring
|
||||
try:
|
||||
kr_res = self._keyring.get(request)
|
||||
if kr_res:
|
||||
return kr_res
|
||||
except Exception:
|
||||
# Keyring missing/unavailable: ignore to allow prompt (workstations)
|
||||
# or to fail cleanly below (headless CI without prompt).
|
||||
pass
|
||||
|
||||
# 3) Prompt (optional)
|
||||
if opts.interactive and opts.allow_prompt:
|
||||
prompt_res = self._prompt.get(request)
|
||||
if prompt_res:
|
||||
if opts.save_prompt_token_to_keyring:
|
||||
try:
|
||||
self._keyring.set(request, prompt_res.token)
|
||||
except Exception:
|
||||
# If keyring cannot store, still use token for this run.
|
||||
pass
|
||||
return prompt_res
|
||||
|
||||
raise NoCredentialsError(
|
||||
f"No token available for {provider_kind}@{host}"
|
||||
+ (f" (owner: {owner})" if owner else "")
|
||||
+ ". Provide it via environment variable or keyring."
|
||||
)
|
||||
54
src/pkgmgr/core/credentials/store_keys.py
Normal file
54
src/pkgmgr/core/credentials/store_keys.py
Normal file
@@ -0,0 +1,54 @@
|
||||
# src/pkgmgr/core/credentials/store_keys.py
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class KeyringKey:
|
||||
"""Keyring address for a token."""
|
||||
|
||||
service: str
|
||||
username: str
|
||||
|
||||
|
||||
def build_keyring_key(provider_kind: str, host: str, owner: Optional[str]) -> KeyringKey:
|
||||
"""Build a stable keyring key.
|
||||
|
||||
- service: "pkgmgr:<provider>"
|
||||
- username: "<host>|<owner>" or "<host>|-"
|
||||
"""
|
||||
provider_kind = str(provider_kind).strip().lower()
|
||||
host = str(host).strip()
|
||||
owner_part = (str(owner).strip() if owner else "-")
|
||||
return KeyringKey(service=f"pkgmgr:{provider_kind}", username=f"{host}|{owner_part}")
|
||||
|
||||
|
||||
def env_var_candidates(provider_kind: str, host: str, owner: Optional[str]) -> list[str]:
|
||||
"""Return a list of environment variable names to try.
|
||||
|
||||
Order is from most specific to most generic.
|
||||
"""
|
||||
kind = re_sub_non_alnum(str(provider_kind).strip().upper())
|
||||
host_norm = re_sub_non_alnum(str(host).strip().upper())
|
||||
candidates: list[str] = []
|
||||
|
||||
if owner:
|
||||
owner_norm = re_sub_non_alnum(str(owner).strip().upper())
|
||||
candidates.append(f"PKGMGR_{kind}_TOKEN_{host_norm}_{owner_norm}")
|
||||
candidates.append(f"PKGMGR_TOKEN_{kind}_{host_norm}_{owner_norm}")
|
||||
|
||||
candidates.append(f"PKGMGR_{kind}_TOKEN_{host_norm}")
|
||||
candidates.append(f"PKGMGR_TOKEN_{kind}_{host_norm}")
|
||||
candidates.append(f"PKGMGR_{kind}_TOKEN")
|
||||
candidates.append(f"PKGMGR_TOKEN_{kind}")
|
||||
candidates.append("PKGMGR_TOKEN")
|
||||
return candidates
|
||||
|
||||
|
||||
def re_sub_non_alnum(value: str) -> str:
|
||||
"""Normalize to an uppercase env-var friendly token (A-Z0-9_)."""
|
||||
import re
|
||||
|
||||
return re.sub(r"[^A-Z0-9]+", "_", value).strip("_")
|
||||
34
src/pkgmgr/core/credentials/types.py
Normal file
34
src/pkgmgr/core/credentials/types.py
Normal file
@@ -0,0 +1,34 @@
|
||||
# src/pkgmgr/core/credentials/types.py
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class CredentialError(RuntimeError):
|
||||
"""Base class for credential resolution errors."""
|
||||
|
||||
|
||||
class NoCredentialsError(CredentialError):
|
||||
"""Raised when no usable credential could be resolved."""
|
||||
|
||||
|
||||
class KeyringUnavailableError(CredentialError):
|
||||
"""Raised when keyring is requested but no backend is available."""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TokenRequest:
|
||||
"""Parameters describing which token we need."""
|
||||
|
||||
provider_kind: str # e.g. "gitea", "github"
|
||||
host: str # e.g. "git.example.org" or "github.com"
|
||||
owner: Optional[str] = None # optional org/user
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TokenResult:
|
||||
"""A resolved token plus metadata about its source."""
|
||||
|
||||
token: str
|
||||
source: str # "env" | "keyring" | "prompt"
|
||||
14
src/pkgmgr/core/remote_provisioning/__init__.py
Normal file
14
src/pkgmgr/core/remote_provisioning/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
# src/pkgmgr/core/remote_provisioning/__init__.py
|
||||
"""Remote repository provisioning (ensure remote repo exists)."""
|
||||
|
||||
from .ensure import ensure_remote_repo
|
||||
from .registry import ProviderRegistry
|
||||
from .types import EnsureResult, ProviderHint, RepoSpec
|
||||
|
||||
__all__ = [
|
||||
"ensure_remote_repo",
|
||||
"RepoSpec",
|
||||
"EnsureResult",
|
||||
"ProviderHint",
|
||||
"ProviderRegistry",
|
||||
]
|
||||
97
src/pkgmgr/core/remote_provisioning/ensure.py
Normal file
97
src/pkgmgr/core/remote_provisioning/ensure.py
Normal file
@@ -0,0 +1,97 @@
|
||||
# src/pkgmgr/core/remote_provisioning/ensure.py
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from pkgmgr.core.credentials.resolver import ResolutionOptions, TokenResolver
|
||||
|
||||
from .http.errors import HttpError
|
||||
from .registry import ProviderRegistry
|
||||
from .types import (
|
||||
AuthError,
|
||||
EnsureResult,
|
||||
NetworkError,
|
||||
PermissionError,
|
||||
ProviderHint,
|
||||
RepoSpec,
|
||||
UnsupportedProviderError,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EnsureOptions:
|
||||
"""Options controlling remote provisioning."""
|
||||
|
||||
preview: bool = False
|
||||
interactive: bool = True
|
||||
allow_prompt: bool = True
|
||||
save_prompt_token_to_keyring: bool = True
|
||||
|
||||
|
||||
def _raise_mapped_http_error(exc: HttpError, host: str) -> None:
|
||||
"""Map HttpError into domain-specific error types."""
|
||||
if exc.status == 0:
|
||||
raise NetworkError(f"Network error while talking to {host}: {exc}") from exc
|
||||
if exc.status == 401:
|
||||
raise AuthError(f"Authentication failed for {host} (401).") from exc
|
||||
if exc.status == 403:
|
||||
raise PermissionError(f"Permission denied for {host} (403).") from exc
|
||||
|
||||
raise NetworkError(
|
||||
f"HTTP error from {host}: status={exc.status}, message={exc}, body={exc.body}"
|
||||
) from exc
|
||||
|
||||
|
||||
def ensure_remote_repo(
|
||||
spec: RepoSpec,
|
||||
provider_hint: Optional[ProviderHint] = None,
|
||||
options: Optional[EnsureOptions] = None,
|
||||
registry: Optional[ProviderRegistry] = None,
|
||||
token_resolver: Optional[TokenResolver] = None,
|
||||
) -> EnsureResult:
|
||||
"""Ensure that the remote repository exists (create if missing).
|
||||
|
||||
- Uses TokenResolver (ENV -> keyring -> prompt)
|
||||
- Selects provider via ProviderRegistry (or provider_hint override)
|
||||
- Respects preview mode (no remote changes)
|
||||
- Maps HTTP errors to domain-specific errors
|
||||
"""
|
||||
opts = options or EnsureOptions()
|
||||
reg = registry or ProviderRegistry.default()
|
||||
resolver = token_resolver or TokenResolver()
|
||||
|
||||
provider = reg.resolve(spec.host)
|
||||
if provider_hint and provider_hint.kind:
|
||||
forced = provider_hint.kind.strip().lower()
|
||||
provider = next(
|
||||
(p for p in reg.providers if getattr(p, "kind", "").lower() == forced),
|
||||
None,
|
||||
)
|
||||
|
||||
if provider is None:
|
||||
raise UnsupportedProviderError(f"No provider matched host: {spec.host}")
|
||||
|
||||
token_opts = ResolutionOptions(
|
||||
interactive=opts.interactive,
|
||||
allow_prompt=opts.allow_prompt,
|
||||
save_prompt_token_to_keyring=opts.save_prompt_token_to_keyring,
|
||||
)
|
||||
token = resolver.get_token(
|
||||
provider_kind=getattr(provider, "kind", "unknown"),
|
||||
host=spec.host,
|
||||
owner=spec.owner,
|
||||
options=token_opts,
|
||||
)
|
||||
|
||||
if opts.preview:
|
||||
return EnsureResult(
|
||||
status="skipped",
|
||||
message="Preview mode: no remote changes performed.",
|
||||
)
|
||||
|
||||
try:
|
||||
return provider.ensure_repo(token.token, spec)
|
||||
except HttpError as exc:
|
||||
_raise_mapped_http_error(exc, host=spec.host)
|
||||
return EnsureResult(status="failed", message="Unreachable error mapping.")
|
||||
5
src/pkgmgr/core/remote_provisioning/http/__init__.py
Normal file
5
src/pkgmgr/core/remote_provisioning/http/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
# src/pkgmgr/core/remote_provisioning/http/__init__.py
|
||||
from .client import HttpClient, HttpResponse
|
||||
from .errors import HttpError
|
||||
|
||||
__all__ = ["HttpClient", "HttpResponse", "HttpError"]
|
||||
69
src/pkgmgr/core/remote_provisioning/http/client.py
Normal file
69
src/pkgmgr/core/remote_provisioning/http/client.py
Normal file
@@ -0,0 +1,69 @@
|
||||
# src/pkgmgr/core/remote_provisioning/http/client.py
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import ssl
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from .errors import HttpError
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class HttpResponse:
|
||||
status: int
|
||||
text: str
|
||||
json: Optional[Dict[str, Any]] = None
|
||||
|
||||
|
||||
class HttpClient:
|
||||
"""Tiny HTTP client (stdlib) with JSON support."""
|
||||
|
||||
def __init__(self, timeout_s: int = 15) -> None:
|
||||
self._timeout_s = int(timeout_s)
|
||||
|
||||
def request_json(
|
||||
self,
|
||||
method: str,
|
||||
url: str,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
payload: Optional[Dict[str, Any]] = None,
|
||||
) -> HttpResponse:
|
||||
data: Optional[bytes] = None
|
||||
final_headers: Dict[str, str] = dict(headers or {})
|
||||
|
||||
if payload is not None:
|
||||
data = json.dumps(payload).encode("utf-8")
|
||||
final_headers.setdefault("Content-Type", "application/json")
|
||||
|
||||
req = urllib.request.Request(url=url, data=data, method=method.upper())
|
||||
for k, v in final_headers.items():
|
||||
req.add_header(k, v)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(
|
||||
req,
|
||||
timeout=self._timeout_s,
|
||||
context=ssl.create_default_context(),
|
||||
) as resp:
|
||||
raw = resp.read().decode("utf-8", errors="replace")
|
||||
|
||||
parsed: Optional[Dict[str, Any]] = None
|
||||
if raw:
|
||||
try:
|
||||
loaded = json.loads(raw)
|
||||
parsed = loaded if isinstance(loaded, dict) else None
|
||||
except Exception:
|
||||
parsed = None
|
||||
|
||||
return HttpResponse(status=int(resp.status), text=raw, json=parsed)
|
||||
except urllib.error.HTTPError as exc:
|
||||
try:
|
||||
body = exc.read().decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
body = ""
|
||||
raise HttpError(status=int(exc.code), message=str(exc), body=body) from exc
|
||||
except urllib.error.URLError as exc:
|
||||
raise HttpError(status=0, message=str(exc), body="") from exc
|
||||
9
src/pkgmgr/core/remote_provisioning/http/errors.py
Normal file
9
src/pkgmgr/core/remote_provisioning/http/errors.py
Normal file
@@ -0,0 +1,9 @@
|
||||
# src/pkgmgr/core/remote_provisioning/http/errors.py
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
class HttpError(RuntimeError):
|
||||
def __init__(self, status: int, message: str, body: str = "") -> None:
|
||||
super().__init__(message)
|
||||
self.status = status
|
||||
self.body = body
|
||||
@@ -0,0 +1,6 @@
|
||||
# src/pkgmgr/core/remote_provisioning/providers/__init__.py
|
||||
from .base import RemoteProvider
|
||||
from .gitea import GiteaProvider
|
||||
from .github import GitHubProvider
|
||||
|
||||
__all__ = ["RemoteProvider", "GiteaProvider", "GitHubProvider"]
|
||||
36
src/pkgmgr/core/remote_provisioning/providers/base.py
Normal file
36
src/pkgmgr/core/remote_provisioning/providers/base.py
Normal file
@@ -0,0 +1,36 @@
|
||||
# src/pkgmgr/core/remote_provisioning/providers/base.py
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from ..types import EnsureResult, RepoSpec
|
||||
|
||||
|
||||
class RemoteProvider(ABC):
|
||||
"""Provider interface for remote repo provisioning."""
|
||||
|
||||
kind: str
|
||||
|
||||
@abstractmethod
|
||||
def can_handle(self, host: str) -> bool:
|
||||
"""Return True if this provider implementation matches the host."""
|
||||
|
||||
@abstractmethod
|
||||
def repo_exists(self, token: str, spec: RepoSpec) -> bool:
|
||||
"""Return True if repo exists and is accessible."""
|
||||
|
||||
@abstractmethod
|
||||
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
|
||||
"""Create a repository (owner may be user or org)."""
|
||||
|
||||
def ensure_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
|
||||
if self.repo_exists(token, spec):
|
||||
return EnsureResult(status="exists", message="Repository exists.")
|
||||
return self.create_repo(token, spec)
|
||||
|
||||
@staticmethod
|
||||
def _api_base(host: str) -> str:
|
||||
# Default to https. If you need http for local dev, store host as "http://..."
|
||||
if host.startswith("http://") or host.startswith("https://"):
|
||||
return host.rstrip("/")
|
||||
return f"https://{host}".rstrip("/")
|
||||
106
src/pkgmgr/core/remote_provisioning/providers/gitea.py
Normal file
106
src/pkgmgr/core/remote_provisioning/providers/gitea.py
Normal file
@@ -0,0 +1,106 @@
|
||||
# src/pkgmgr/core/remote_provisioning/providers/gitea.py
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict
|
||||
|
||||
from ..http.client import HttpClient
|
||||
from ..http.errors import HttpError
|
||||
from ..types import EnsureResult, RepoSpec
|
||||
from .base import RemoteProvider
|
||||
|
||||
|
||||
class GiteaProvider(RemoteProvider):
|
||||
"""Gitea provider using Gitea REST API v1."""
|
||||
|
||||
kind = "gitea"
|
||||
|
||||
def __init__(self, timeout_s: int = 15) -> None:
|
||||
self._http = HttpClient(timeout_s=timeout_s)
|
||||
|
||||
def can_handle(self, host: str) -> bool:
|
||||
"""
|
||||
Heuristic host match:
|
||||
- Acts as a fallback provider for self-hosted setups.
|
||||
- Must NOT claim GitHub hosts.
|
||||
- If you add more providers later, tighten this heuristic or use provider hints.
|
||||
"""
|
||||
h = host.lower()
|
||||
if h in ("github.com", "api.github.com") or h.endswith(".github.com"):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _headers(self, token: str) -> Dict[str, str]:
|
||||
"""
|
||||
Gitea commonly supports:
|
||||
Authorization: token <TOKEN>
|
||||
Newer versions may also accept Bearer tokens, but "token" is broadly compatible.
|
||||
"""
|
||||
return {
|
||||
"Authorization": f"token {token}",
|
||||
"Accept": "application/json",
|
||||
"User-Agent": "pkgmgr",
|
||||
}
|
||||
|
||||
def repo_exists(self, token: str, spec: RepoSpec) -> bool:
|
||||
base = self._api_base(spec.host)
|
||||
url = f"{base}/api/v1/repos/{spec.owner}/{spec.name}"
|
||||
try:
|
||||
resp = self._http.request_json("GET", url, headers=self._headers(token))
|
||||
return 200 <= resp.status < 300
|
||||
except HttpError as exc:
|
||||
if exc.status == 404:
|
||||
return False
|
||||
raise
|
||||
|
||||
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
|
||||
base = self._api_base(spec.host)
|
||||
|
||||
payload: Dict[str, Any] = {
|
||||
"name": spec.name,
|
||||
"private": bool(spec.private),
|
||||
}
|
||||
if spec.description:
|
||||
payload["description"] = spec.description
|
||||
if spec.default_branch:
|
||||
payload["default_branch"] = spec.default_branch
|
||||
|
||||
org_url = f"{base}/api/v1/orgs/{spec.owner}/repos"
|
||||
user_url = f"{base}/api/v1/user/repos"
|
||||
|
||||
# Try org first, then fall back to user creation.
|
||||
try:
|
||||
resp = self._http.request_json(
|
||||
"POST",
|
||||
org_url,
|
||||
headers=self._headers(token),
|
||||
payload=payload,
|
||||
)
|
||||
if 200 <= resp.status < 300:
|
||||
html_url = (resp.json or {}).get("html_url") if resp.json else None
|
||||
return EnsureResult(
|
||||
status="created",
|
||||
message="Repository created (org).",
|
||||
url=str(html_url) if html_url else None,
|
||||
)
|
||||
except HttpError:
|
||||
# Typical org failures: 404 (not an org), 403 (no rights), 401 (bad token).
|
||||
pass
|
||||
|
||||
resp = self._http.request_json(
|
||||
"POST",
|
||||
user_url,
|
||||
headers=self._headers(token),
|
||||
payload=payload,
|
||||
)
|
||||
if 200 <= resp.status < 300:
|
||||
html_url = (resp.json or {}).get("html_url") if resp.json else None
|
||||
return EnsureResult(
|
||||
status="created",
|
||||
message="Repository created (user).",
|
||||
url=str(html_url) if html_url else None,
|
||||
)
|
||||
|
||||
return EnsureResult(
|
||||
status="failed",
|
||||
message=f"Failed to create repository (status {resp.status}).",
|
||||
)
|
||||
101
src/pkgmgr/core/remote_provisioning/providers/github.py
Normal file
101
src/pkgmgr/core/remote_provisioning/providers/github.py
Normal file
@@ -0,0 +1,101 @@
|
||||
# src/pkgmgr/core/remote_provisioning/providers/github.py
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict
|
||||
|
||||
from ..http.client import HttpClient
|
||||
from ..http.errors import HttpError
|
||||
from ..types import EnsureResult, RepoSpec
|
||||
from .base import RemoteProvider
|
||||
|
||||
|
||||
class GitHubProvider(RemoteProvider):
|
||||
"""GitHub provider using GitHub REST API."""
|
||||
|
||||
kind = "github"
|
||||
|
||||
def __init__(self, timeout_s: int = 15) -> None:
|
||||
self._http = HttpClient(timeout_s=timeout_s)
|
||||
|
||||
def can_handle(self, host: str) -> bool:
|
||||
h = host.lower()
|
||||
return h in ("github.com", "api.github.com") or h.endswith(".github.com")
|
||||
|
||||
def _api_base(self, host: str) -> str:
|
||||
"""
|
||||
GitHub API base:
|
||||
- Public GitHub: https://api.github.com
|
||||
- GitHub Enterprise Server: https://<host>/api/v3
|
||||
"""
|
||||
h = host.lower()
|
||||
if h in ("github.com", "api.github.com"):
|
||||
return "https://api.github.com"
|
||||
|
||||
# Enterprise instance:
|
||||
if host.startswith("http://") or host.startswith("https://"):
|
||||
return host.rstrip("/") + "/api/v3"
|
||||
return f"https://{host}/api/v3"
|
||||
|
||||
def _headers(self, token: str) -> Dict[str, str]:
|
||||
return {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Accept": "application/vnd.github+json",
|
||||
"User-Agent": "pkgmgr",
|
||||
}
|
||||
|
||||
def repo_exists(self, token: str, spec: RepoSpec) -> bool:
|
||||
api = self._api_base(spec.host)
|
||||
url = f"{api}/repos/{spec.owner}/{spec.name}"
|
||||
try:
|
||||
resp = self._http.request_json("GET", url, headers=self._headers(token))
|
||||
return 200 <= resp.status < 300
|
||||
except HttpError as exc:
|
||||
if exc.status == 404:
|
||||
return False
|
||||
raise
|
||||
|
||||
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
|
||||
api = self._api_base(spec.host)
|
||||
|
||||
payload: Dict[str, Any] = {
|
||||
"name": spec.name,
|
||||
"private": bool(spec.private),
|
||||
}
|
||||
if spec.description:
|
||||
payload["description"] = spec.description
|
||||
if spec.default_branch:
|
||||
payload["default_branch"] = spec.default_branch
|
||||
|
||||
org_url = f"{api}/orgs/{spec.owner}/repos"
|
||||
user_url = f"{api}/user/repos"
|
||||
|
||||
# Try org first, then fall back to user creation.
|
||||
try:
|
||||
resp = self._http.request_json(
|
||||
"POST", org_url, headers=self._headers(token), payload=payload
|
||||
)
|
||||
if 200 <= resp.status < 300:
|
||||
html_url = (resp.json or {}).get("html_url") if resp.json else None
|
||||
return EnsureResult(
|
||||
status="created",
|
||||
message="Repository created (org).",
|
||||
url=str(html_url) if html_url else None,
|
||||
)
|
||||
except HttpError:
|
||||
pass
|
||||
|
||||
resp = self._http.request_json(
|
||||
"POST", user_url, headers=self._headers(token), payload=payload
|
||||
)
|
||||
if 200 <= resp.status < 300:
|
||||
html_url = (resp.json or {}).get("html_url") if resp.json else None
|
||||
return EnsureResult(
|
||||
status="created",
|
||||
message="Repository created (user).",
|
||||
url=str(html_url) if html_url else None,
|
||||
)
|
||||
|
||||
return EnsureResult(
|
||||
status="failed",
|
||||
message=f"Failed to create repository (status {resp.status}).",
|
||||
)
|
||||
30
src/pkgmgr/core/remote_provisioning/registry.py
Normal file
30
src/pkgmgr/core/remote_provisioning/registry.py
Normal file
@@ -0,0 +1,30 @@
|
||||
# src/pkgmgr/core/remote_provisioning/registry.py
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional
|
||||
|
||||
from .providers.base import RemoteProvider
|
||||
from .providers.gitea import GiteaProvider
|
||||
from .providers.github import GitHubProvider
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProviderRegistry:
|
||||
"""Resolve the correct provider implementation for a host."""
|
||||
|
||||
providers: List[RemoteProvider]
|
||||
|
||||
@classmethod
|
||||
def default(cls) -> "ProviderRegistry":
|
||||
# Order matters: more specific providers first; fallback providers last.
|
||||
return cls(providers=[GitHubProvider(), GiteaProvider()])
|
||||
|
||||
def resolve(self, host: str) -> Optional[RemoteProvider]:
|
||||
for p in self.providers:
|
||||
try:
|
||||
if p.can_handle(host):
|
||||
return p
|
||||
except Exception:
|
||||
continue
|
||||
return None
|
||||
61
src/pkgmgr/core/remote_provisioning/types.py
Normal file
61
src/pkgmgr/core/remote_provisioning/types.py
Normal file
@@ -0,0 +1,61 @@
|
||||
# src/pkgmgr/core/remote_provisioning/types.py
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal, Optional
|
||||
|
||||
EnsureStatus = Literal["exists", "created", "skipped", "failed"]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ProviderHint:
|
||||
"""Optional hint to force a provider kind."""
|
||||
|
||||
kind: Optional[str] = None # e.g. "gitea" or "github"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RepoSpec:
|
||||
"""Desired remote repository."""
|
||||
|
||||
host: str
|
||||
owner: str
|
||||
name: str
|
||||
private: bool = True
|
||||
description: str = ""
|
||||
default_branch: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EnsureResult:
|
||||
status: EnsureStatus
|
||||
message: str
|
||||
url: Optional[str] = None
|
||||
|
||||
|
||||
class RemoteProvisioningError(RuntimeError):
|
||||
"""Base class for remote provisioning errors."""
|
||||
|
||||
|
||||
class AuthError(RemoteProvisioningError):
|
||||
"""Authentication failed (401)."""
|
||||
|
||||
|
||||
class PermissionError(RemoteProvisioningError):
|
||||
"""Permission denied (403)."""
|
||||
|
||||
|
||||
class NotFoundError(RemoteProvisioningError):
|
||||
"""Resource not found (404)."""
|
||||
|
||||
|
||||
class PolicyError(RemoteProvisioningError):
|
||||
"""Provider/org policy prevents the operation."""
|
||||
|
||||
|
||||
class NetworkError(RemoteProvisioningError):
|
||||
"""Network/transport errors."""
|
||||
|
||||
|
||||
class UnsupportedProviderError(RemoteProvisioningError):
|
||||
"""No provider matched for the given host."""
|
||||
@@ -4,21 +4,21 @@
|
||||
"""
|
||||
E2E integration tests for the `pkgmgr mirror` command family.
|
||||
|
||||
This test class covers:
|
||||
Covered commands:
|
||||
|
||||
- pkgmgr mirror --help
|
||||
- pkgmgr mirror list --preview --all
|
||||
- pkgmgr mirror diff --preview --all
|
||||
- pkgmgr mirror merge config file --preview --all
|
||||
- pkgmgr mirror setup --preview --all
|
||||
- pkgmgr mirror check --preview --all
|
||||
- pkgmgr mirror provision --preview --all
|
||||
|
||||
All of these subcommands are fully wired at CLI level and do not
|
||||
require mocks. With --preview, merge and setup do not perform
|
||||
destructive actions, making them safe for CI execution.
|
||||
All commands are executed via the real CLI entry point (main module).
|
||||
With --preview enabled, all operations are non-destructive and safe
|
||||
to run inside CI containers.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import runpy
|
||||
import sys
|
||||
@@ -28,25 +28,25 @@ from contextlib import redirect_stdout, redirect_stderr
|
||||
|
||||
class TestIntegrationMirrorCommands(unittest.TestCase):
|
||||
"""
|
||||
E2E tests for `pkgmgr mirror` commands.
|
||||
End-to-end tests for `pkgmgr mirror` commands.
|
||||
"""
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# Helper
|
||||
# ------------------------------------------------------------
|
||||
def _run_pkgmgr(self, args: list[str]) -> str:
|
||||
def _run_pkgmgr(self, args):
|
||||
"""
|
||||
Execute pkgmgr with the given arguments and return captured stdout+stderr.
|
||||
Execute pkgmgr with the given arguments and return captured output.
|
||||
|
||||
- Treat SystemExit(0) or SystemExit(None) as success.
|
||||
- Convert non-zero exit codes into AssertionError.
|
||||
- Any other exit code is considered a test failure.
|
||||
"""
|
||||
original_argv = list(sys.argv)
|
||||
buffer = io.StringIO()
|
||||
cmd_repr = "pkgmgr " + " ".join(args)
|
||||
|
||||
try:
|
||||
sys.argv = ["pkgmgr"] + args
|
||||
sys.argv = ["pkgmgr"] + list(args)
|
||||
|
||||
try:
|
||||
with redirect_stdout(buffer), redirect_stderr(buffer):
|
||||
@@ -55,9 +55,9 @@ class TestIntegrationMirrorCommands(unittest.TestCase):
|
||||
code = exc.code if isinstance(exc.code, int) else None
|
||||
if code not in (0, None):
|
||||
raise AssertionError(
|
||||
f"{cmd_repr!r} failed with exit code {exc.code}. "
|
||||
"Scroll up to inspect the pkgmgr output."
|
||||
) from exc
|
||||
"%r failed with exit code %r.\n\nOutput:\n%s"
|
||||
% (cmd_repr, exc.code, buffer.getvalue())
|
||||
)
|
||||
|
||||
return buffer.getvalue()
|
||||
|
||||
@@ -68,44 +68,41 @@ class TestIntegrationMirrorCommands(unittest.TestCase):
|
||||
# Tests
|
||||
# ------------------------------------------------------------
|
||||
|
||||
def test_mirror_help(self) -> None:
|
||||
def test_mirror_help(self):
|
||||
"""
|
||||
Ensure `pkgmgr mirror --help` runs successfully
|
||||
and prints a usage message for the mirror command.
|
||||
`pkgmgr mirror --help` should run without error and print usage info.
|
||||
"""
|
||||
output = self._run_pkgmgr(["mirror", "--help"])
|
||||
self.assertIn("usage:", output)
|
||||
self.assertIn("pkgmgr mirror", output)
|
||||
|
||||
def test_mirror_list_preview_all(self) -> None:
|
||||
def test_mirror_list_preview_all(self):
|
||||
"""
|
||||
`pkgmgr mirror list --preview --all` should run without error
|
||||
and produce some output for the selected repositories.
|
||||
`pkgmgr mirror list --preview --all`
|
||||
"""
|
||||
output = self._run_pkgmgr(["mirror", "list", "--preview", "--all"])
|
||||
# Do not assert specific wording; just ensure something was printed.
|
||||
output = self._run_pkgmgr(
|
||||
["mirror", "list", "--preview", "--all"]
|
||||
)
|
||||
self.assertTrue(
|
||||
output.strip(),
|
||||
msg="Expected `pkgmgr mirror list --preview --all` to produce output.",
|
||||
"Expected output from mirror list",
|
||||
)
|
||||
|
||||
def test_mirror_diff_preview_all(self) -> None:
|
||||
def test_mirror_diff_preview_all(self):
|
||||
"""
|
||||
`pkgmgr mirror diff --preview --all` should run without error
|
||||
and produce some diagnostic output (diff header, etc.).
|
||||
`pkgmgr mirror diff --preview --all`
|
||||
"""
|
||||
output = self._run_pkgmgr(["mirror", "diff", "--preview", "--all"])
|
||||
output = self._run_pkgmgr(
|
||||
["mirror", "diff", "--preview", "--all"]
|
||||
)
|
||||
self.assertTrue(
|
||||
output.strip(),
|
||||
msg="Expected `pkgmgr mirror diff --preview --all` to produce output.",
|
||||
"Expected output from mirror diff",
|
||||
)
|
||||
|
||||
def test_mirror_merge_config_to_file_preview_all(self) -> None:
|
||||
def test_mirror_merge_config_to_file_preview_all(self):
|
||||
"""
|
||||
`pkgmgr mirror merge config file --preview --all` should run without error.
|
||||
|
||||
In preview mode this does not change either config or MIRRORS files;
|
||||
it only prints what would be merged.
|
||||
`pkgmgr mirror merge config file --preview --all`
|
||||
"""
|
||||
output = self._run_pkgmgr(
|
||||
[
|
||||
@@ -119,23 +116,47 @@ class TestIntegrationMirrorCommands(unittest.TestCase):
|
||||
)
|
||||
self.assertTrue(
|
||||
output.strip(),
|
||||
msg=(
|
||||
"Expected `pkgmgr mirror merge config file --preview --all` "
|
||||
"to produce output."
|
||||
),
|
||||
"Expected output from mirror merge (config -> file)",
|
||||
)
|
||||
|
||||
def test_mirror_setup_preview_all(self) -> None:
|
||||
def test_mirror_setup_preview_all(self):
|
||||
"""
|
||||
`pkgmgr mirror setup --preview --all` should run without error.
|
||||
|
||||
In preview mode only the intended Git operations and remote
|
||||
suggestions are printed; no real changes are made.
|
||||
`pkgmgr mirror setup --preview --all`
|
||||
"""
|
||||
output = self._run_pkgmgr(["mirror", "setup", "--preview", "--all"])
|
||||
output = self._run_pkgmgr(
|
||||
["mirror", "setup", "--preview", "--all"]
|
||||
)
|
||||
self.assertTrue(
|
||||
output.strip(),
|
||||
msg="Expected `pkgmgr mirror setup --preview --all` to produce output.",
|
||||
"Expected output from mirror setup",
|
||||
)
|
||||
|
||||
def test_mirror_check_preview_all(self):
|
||||
"""
|
||||
`pkgmgr mirror check --preview --all`
|
||||
|
||||
Performs non-destructive remote checks (git ls-remote).
|
||||
"""
|
||||
output = self._run_pkgmgr(
|
||||
["mirror", "check", "--preview", "--all"]
|
||||
)
|
||||
self.assertTrue(
|
||||
output.strip(),
|
||||
"Expected output from mirror check",
|
||||
)
|
||||
|
||||
def test_mirror_provision_preview_all(self):
|
||||
"""
|
||||
`pkgmgr mirror provision --preview --all`
|
||||
|
||||
In preview mode this MUST NOT create remote repositories.
|
||||
"""
|
||||
output = self._run_pkgmgr(
|
||||
["mirror", "provision", "--preview", "--all"]
|
||||
)
|
||||
self.assertTrue(
|
||||
output.strip(),
|
||||
"Expected output from mirror provision (preview)",
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user