**feat(mirror,credentials): improve remote provisioning UX and token handling**
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / codesniffer-shellcheck (push) Has been cancelled
Mark stable commit / codesniffer-ruff (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled

* Split mirror logic into atomic modules (remote check, provisioning, URL utils)
* Normalize Git remote URLs and provider host detection
* Add provider-specific token help URLs (GitHub, Gitea/Forgejo, GitLab)
* Improve keyring handling with clear warnings and install hints
* Gracefully fall back to prompt when keyring is unavailable
* Fix provider hint override logic during remote provisioning
This commit is contained in:
Kevin Veen-Birkenbach
2025-12-14 14:48:05 +01:00
parent 0e03fbbee2
commit 0d652d995e
8 changed files with 260 additions and 148 deletions

View File

@@ -0,0 +1,21 @@
# src/pkgmgr/actions/mirror/remote_check.py
from __future__ import annotations
from typing import Tuple
from pkgmgr.core.git import GitError, run_git
def probe_mirror(url: str, repo_dir: str) -> Tuple[bool, str]:
"""
Probe a remote mirror URL using `git ls-remote`.
Returns:
(True, "") on success,
(False, error_message) on failure.
"""
try:
run_git(["ls-remote", url], cwd=repo_dir)
return True, ""
except GitError as exc:
return False, str(exc)

View File

@@ -0,0 +1,77 @@
# src/pkgmgr/actions/mirror/remote_provision.py
from __future__ import annotations
from typing import List
from pkgmgr.core.remote_provisioning import ProviderHint, RepoSpec, ensure_remote_repo
from pkgmgr.core.remote_provisioning.ensure import EnsureOptions
from .context import build_context
from .git_remote import determine_primary_remote_url
from .types import Repository
from .url_utils import hostport_from_git_url, normalize_provider_host
def ensure_remote_repository(
repo: Repository,
repositories_base_dir: str,
all_repos: List[Repository],
preview: bool,
) -> None:
"""
Ensure that the remote repository exists using provider APIs.
This is ONLY called when ensure_remote=True.
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
resolved_mirrors = ctx.resolved_mirrors
primary_url = determine_primary_remote_url(repo, resolved_mirrors)
if not primary_url:
print("[INFO] No remote URL could be derived; skipping remote provisioning.")
return
host_raw, _port = hostport_from_git_url(primary_url)
host = normalize_provider_host(host_raw)
owner = repo.get("account")
name = repo.get("repository")
if not host or not owner or not name:
print("[WARN] Missing host/account/repository; cannot ensure remote repo.")
print(f" host={host!r}, account={owner!r}, repository={name!r}")
return
print("------------------------------------------------------------")
print(f"[REMOTE ENSURE] {ctx.identifier}")
print(f"[REMOTE ENSURE] host: {host}")
print("------------------------------------------------------------")
spec = RepoSpec(
host=str(host),
owner=str(owner),
name=str(name),
private=bool(repo.get("private", True)),
description=str(repo.get("description", "")),
)
provider_kind = str(repo.get("provider", "")).strip().lower() or None
try:
result = ensure_remote_repo(
spec,
provider_hint=ProviderHint(kind=provider_kind),
options=EnsureOptions(
preview=preview,
interactive=True,
allow_prompt=True,
save_prompt_token_to_keyring=True,
),
)
print(f"[REMOTE ENSURE] {result.status.upper()}: {result.message}")
if result.url:
print(f"[REMOTE ENSURE] URL: {result.url}")
except Exception as exc: # noqa: BLE001
print(f"[ERROR] Remote provisioning failed: {exc}")
print()

View File

@@ -1,131 +1,21 @@
# src/pkgmgr/actions/mirror/setup_cmd.py
from __future__ import annotations
from typing import List, Tuple
from urllib.parse import urlparse
from pkgmgr.core.git import GitError, run_git
from pkgmgr.core.remote_provisioning import ProviderHint, RepoSpec, ensure_remote_repo
from pkgmgr.core.remote_provisioning.ensure import EnsureOptions
from typing import List
from .context import build_context
from .git_remote import determine_primary_remote_url, ensure_origin_remote
from .git_remote import ensure_origin_remote
from .remote_check import probe_mirror
from .remote_provision import ensure_remote_repository
from .types import Repository
def _probe_mirror(url: str, repo_dir: str) -> Tuple[bool, str]:
"""
Probe a remote mirror URL using `git ls-remote`.
Returns:
(True, "") on success,
(False, error_message) on failure.
"""
try:
run_git(["ls-remote", url], cwd=repo_dir)
return True, ""
except GitError as exc:
return False, str(exc)
def _host_from_git_url(url: str) -> str:
url = (url or "").strip()
if not url:
return ""
if "://" in url:
parsed = urlparse(url)
netloc = (parsed.netloc or "").strip()
if "@" in netloc:
netloc = netloc.split("@", 1)[1]
# keep optional :port
return netloc
# scp-like: git@host:owner/repo.git
if "@" in url and ":" in url:
after_at = url.split("@", 1)[1]
host = after_at.split(":", 1)[0]
return host.strip()
return url.split("/", 1)[0].strip()
def _ensure_remote_repository(
repo: Repository,
repositories_base_dir: str,
all_repos: List[Repository],
preview: bool,
) -> None:
"""
Ensure that the remote repository exists using provider APIs.
This is ONLY called when ensure_remote=True.
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
resolved_mirrors = ctx.resolved_mirrors
primary_url = determine_primary_remote_url(repo, resolved_mirrors)
if not primary_url:
print("[INFO] No remote URL could be derived; skipping remote provisioning.")
return
# IMPORTANT:
# - repo["provider"] is typically a provider *kind* (e.g. "github" / "gitea"),
# NOT a hostname. We derive the actual host from the remote URL.
host = _host_from_git_url(primary_url)
owner = repo.get("account")
name = repo.get("repository")
if not host or not owner or not name:
print("[WARN] Missing host/account/repository; cannot ensure remote repo.")
print(f" host={host!r}, account={owner!r}, repository={name!r}")
return
print("------------------------------------------------------------")
print(f"[REMOTE ENSURE] {ctx.identifier}")
print(f"[REMOTE ENSURE] host: {host}")
print("------------------------------------------------------------")
spec = RepoSpec(
host=str(host),
owner=str(owner),
name=str(name),
private=bool(repo.get("private", True)),
description=str(repo.get("description", "")),
)
provider_kind = str(repo.get("provider", "")).strip().lower() or None
try:
result = ensure_remote_repo(
spec,
provider_hint=ProviderHint(kind=provider_kind),
options=EnsureOptions(
preview=preview,
interactive=True,
allow_prompt=True,
save_prompt_token_to_keyring=True,
),
)
print(f"[REMOTE ENSURE] {result.status.upper()}: {result.message}")
if result.url:
print(f"[REMOTE ENSURE] URL: {result.url}")
except Exception as exc: # noqa: BLE001
# Keep action layer resilient
print(f"[ERROR] Remote provisioning failed: {exc}")
print()
def _setup_local_mirrors_for_repo(
repo: Repository,
repositories_base_dir: str,
all_repos: List[Repository],
preview: bool,
) -> None:
"""
Local setup:
- Ensure 'origin' remote exists and is sane
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
print("------------------------------------------------------------")
@@ -144,17 +34,6 @@ def _setup_remote_mirrors_for_repo(
preview: bool,
ensure_remote: bool,
) -> None:
"""
Remote-side setup / validation.
Default behavior:
- Non-destructive checks using `git ls-remote`.
Optional behavior:
- If ensure_remote=True:
* Attempt to create missing repositories via provider API
* Uses TokenResolver (ENV -> keyring -> prompt)
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
resolved_mirrors = ctx.resolved_mirrors
@@ -164,7 +43,7 @@ def _setup_remote_mirrors_for_repo(
print("------------------------------------------------------------")
if ensure_remote:
_ensure_remote_repository(
ensure_remote_repository(
repo,
repositories_base_dir=repositories_base_dir,
all_repos=all_repos,
@@ -178,7 +57,7 @@ def _setup_remote_mirrors_for_repo(
print()
return
ok, error_message = _probe_mirror(primary_url, ctx.repo_dir)
ok, error_message = probe_mirror(primary_url, ctx.repo_dir)
if ok:
print(f"[OK] primary: {primary_url}")
else:
@@ -190,7 +69,7 @@ def _setup_remote_mirrors_for_repo(
return
for name, url in sorted(resolved_mirrors.items()):
ok, error_message = _probe_mirror(url, ctx.repo_dir)
ok, error_message = probe_mirror(url, ctx.repo_dir)
if ok:
print(f"[OK] {name}: {url}")
else:
@@ -210,19 +89,6 @@ def setup_mirrors(
remote: bool = True,
ensure_remote: bool = False,
) -> None:
"""
Setup mirrors for the selected repositories.
local:
- Configure local Git remotes (ensure 'origin' exists).
remote:
- Non-destructive remote checks using `git ls-remote`.
ensure_remote:
- If True, attempt to create missing remote repositories via provider APIs.
- This is explicit and NEVER enabled implicitly.
"""
for repo in selected_repos:
if local:
_setup_local_mirrors_for_repo(

View File

@@ -0,0 +1,67 @@
# src/pkgmgr/actions/mirror/url_utils.py
from __future__ import annotations
from urllib.parse import urlparse
from typing import Optional, Tuple
def hostport_from_git_url(url: str) -> Tuple[str, Optional[str]]:
"""
Extract (host, port) from common Git remote URL formats.
Supports:
- ssh://git@host:2201/owner/repo.git
- https://host/owner/repo.git
- git@host:owner/repo.git (scp-like; no explicit port)
"""
url = (url or "").strip()
if not url:
return "", None
if "://" in url:
parsed = urlparse(url)
netloc = (parsed.netloc or "").strip()
if "@" in netloc:
netloc = netloc.split("@", 1)[1]
# IPv6 bracket form: [::1]:2222
if netloc.startswith("[") and "]" in netloc:
host = netloc[1:netloc.index("]")]
rest = netloc[netloc.index("]") + 1 :]
port = rest[1:] if rest.startswith(":") else None
return host.strip(), (port.strip() if port else None)
if ":" in netloc:
host, port = netloc.rsplit(":", 1)
return host.strip(), (port.strip() or None)
return netloc.strip(), None
# scp-like: git@host:owner/repo.git
if "@" in url and ":" in url:
after_at = url.split("@", 1)[1]
host = after_at.split(":", 1)[0].strip()
return host, None
host = url.split("/", 1)[0].strip()
return host, None
def normalize_provider_host(host: str) -> str:
"""
Normalize host for provider matching:
- strip brackets
- strip optional :port
- lowercase
"""
host = (host or "").strip()
if not host:
return ""
if host.startswith("[") and "]" in host:
host = host[1:host.index("]")]
if ":" in host and host.count(":") == 1:
host = host.rsplit(":", 1)[0]
return host.strip().lower()

View File

@@ -9,15 +9,33 @@ from ..types import KeyringUnavailableError, TokenRequest, TokenResult
def _import_keyring():
"""
Import python-keyring.
Raises:
KeyringUnavailableError if:
- library is missing
- no backend is configured / usable
- import fails for any reason
"""
try:
import keyring # type: ignore
return keyring
except Exception as exc: # noqa: BLE001
raise KeyringUnavailableError(
"python-keyring is not available or no backend is configured."
"python-keyring is not installed."
) from exc
# Some environments have keyring installed but no usable backend.
# We do a lightweight "backend sanity check" by attempting to read the backend.
try:
_ = keyring.get_keyring()
except Exception as exc: # noqa: BLE001
raise KeyringUnavailableError(
"python-keyring is installed but no usable keyring backend is configured."
) from exc
return keyring
@dataclass(frozen=True)
class KeyringTokenProvider:

View File

@@ -9,6 +9,37 @@ from typing import Optional
from ..types import TokenRequest, TokenResult
def _token_help_url(provider_kind: str, host: str) -> Optional[str]:
"""
Return a provider-specific URL where a user can create/get an API token.
Keep this conservative and stable:
- GitHub: official token settings URL
- Gitea/Forgejo: common settings path on the given host
- GitLab: common personal access token path
"""
kind = (provider_kind or "").strip().lower()
h = (host or "").strip()
# GitHub (cloud)
if kind == "github":
return "https://github.com/settings/tokens"
# Gitea / Forgejo (self-hosted)
if kind in ("gitea", "forgejo"):
# Typical UI path: Settings -> Applications -> Access Tokens
# In many installations this is available at /user/settings/applications
base = f"https://{h}".rstrip("/")
return f"{base}/user/settings/applications"
# GitLab (cloud or self-hosted)
if kind == "gitlab":
base = "https://gitlab.com" if not h else f"https://{h}".rstrip("/")
return f"{base}/-/profile/personal_access_tokens"
return None
@dataclass(frozen=True)
class PromptTokenProvider:
"""Interactively prompt for a token.
@@ -25,6 +56,11 @@ class PromptTokenProvider:
return None
owner_info = f" (owner: {request.owner})" if request.owner else ""
help_url = _token_help_url(request.provider_kind, request.host)
if help_url:
print(f"[INFO] Create/get your token here: {help_url}")
prompt = f"Enter API token for {request.provider_kind} on {request.host}{owner_info}: "
token = (getpass(prompt) or "").strip()
if not token:

View File

@@ -1,13 +1,14 @@
# src/pkgmgr/core/credentials/resolver.py
from __future__ import annotations
import sys
from dataclasses import dataclass
from typing import Optional
from .providers.env import EnvTokenProvider
from .providers.keyring import KeyringTokenProvider
from .providers.prompt import PromptTokenProvider
from .types import NoCredentialsError, TokenRequest, TokenResult
from .types import KeyringUnavailableError, NoCredentialsError, TokenRequest, TokenResult
@dataclass(frozen=True)
@@ -26,6 +27,26 @@ class TokenResolver:
self._env = EnvTokenProvider()
self._keyring = KeyringTokenProvider()
self._prompt = PromptTokenProvider()
self._warned_keyring: bool = False
def _warn_keyring_unavailable(self, exc: Exception) -> None:
if self._warned_keyring:
return
self._warned_keyring = True
msg = str(exc).strip() or "Keyring is unavailable."
print("[WARN] Keyring support is not available.", file=sys.stderr)
print(f" {msg}", file=sys.stderr)
print(" Tokens will NOT be persisted securely.", file=sys.stderr)
print("", file=sys.stderr)
print(" To enable secure token storage, install python-keyring:", file=sys.stderr)
print(" pip install keyring", file=sys.stderr)
print("", file=sys.stderr)
print(" Or install via system packages:", file=sys.stderr)
print(" sudo apt install python3-keyring", file=sys.stderr)
print(" sudo pacman -S python-keyring", file=sys.stderr)
print(" sudo dnf install python3-keyring", file=sys.stderr)
print("", file=sys.stderr)
def get_token(
self,
@@ -47,9 +68,11 @@ class TokenResolver:
kr_res = self._keyring.get(request)
if kr_res:
return kr_res
except KeyringUnavailableError as exc:
# Show a helpful warning once, then continue (prompt fallback).
self._warn_keyring_unavailable(exc)
except Exception:
# Keyring missing/unavailable: ignore to allow prompt (workstations)
# or to fail cleanly below (headless CI without prompt).
# Unknown keyring errors: do not block prompting; still avoid hard crash.
pass
# 3) Prompt (optional)
@@ -59,6 +82,8 @@ class TokenResolver:
if opts.save_prompt_token_to_keyring:
try:
self._keyring.set(request, prompt_res.token)
except KeyringUnavailableError as exc:
self._warn_keyring_unavailable(exc)
except Exception:
# If keyring cannot store, still use token for this run.
pass

View File

@@ -64,10 +64,12 @@ def ensure_remote_repo(
provider = reg.resolve(spec.host)
if provider_hint and provider_hint.kind:
forced = provider_hint.kind.strip().lower()
provider = next(
forced_provider = next(
(p for p in reg.providers if getattr(p, "kind", "").lower() == forced),
None,
)
if forced_provider is not None:
provider = forced_provider
if provider is None:
raise UnsupportedProviderError(f"No provider matched host: {spec.host}")