refactor(mirror): probe remotes with detailed reasons and provision all git mirrors
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Add probe_remote_reachable_detail and improved GitRunError metadata - Print short failure reasons for unreachable remotes - Provision each git mirror URL via ensure_remote_repository_for_url https://chatgpt.com/share/6946956e-f738-800f-a446-e2c8bf5595f4
This commit is contained in:
@@ -11,35 +11,37 @@ from .types import Repository
|
||||
from .url_utils import normalize_provider_host, parse_repo_from_git_url
|
||||
|
||||
|
||||
def ensure_remote_repository(
|
||||
repo: Repository,
|
||||
repositories_base_dir: str,
|
||||
all_repos: List[Repository],
|
||||
def _provider_hint_from_host(host: str) -> str | None:
|
||||
h = (host or "").lower()
|
||||
if h == "github.com":
|
||||
return "github"
|
||||
# Best-effort default for self-hosted git domains
|
||||
return "gitea" if h else None
|
||||
|
||||
|
||||
def ensure_remote_repository_for_url(
|
||||
*,
|
||||
url: str,
|
||||
private_default: bool,
|
||||
description: str,
|
||||
preview: bool,
|
||||
) -> None:
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
|
||||
primary_url = determine_primary_remote_url(repo, ctx)
|
||||
if not primary_url:
|
||||
print("[INFO] No primary URL found; skipping remote provisioning.")
|
||||
return
|
||||
|
||||
host_raw, owner, name = parse_repo_from_git_url(primary_url)
|
||||
host_raw, owner, name = parse_repo_from_git_url(url)
|
||||
host = normalize_provider_host(host_raw)
|
||||
|
||||
if not host or not owner or not name:
|
||||
print("[WARN] Could not parse remote URL:", primary_url)
|
||||
print(f"[WARN] Could not parse repo from URL: {url}")
|
||||
return
|
||||
|
||||
spec = RepoSpec(
|
||||
host=host,
|
||||
owner=owner,
|
||||
name=name,
|
||||
private=bool(repo.get("private", True)),
|
||||
description=str(repo.get("description", "")),
|
||||
private=private_default,
|
||||
description=description,
|
||||
)
|
||||
|
||||
provider_kind = str(repo.get("provider", "")).lower() or None
|
||||
provider_kind = _provider_hint_from_host(host)
|
||||
|
||||
try:
|
||||
result = ensure_remote_repo(
|
||||
@@ -56,4 +58,29 @@ def ensure_remote_repository(
|
||||
if result.url:
|
||||
print(f"[REMOTE ENSURE] URL: {result.url}")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
print(f"[ERROR] Remote provisioning failed: {exc}")
|
||||
print(f"[ERROR] Remote provisioning failed for {url!r}: {exc}")
|
||||
|
||||
|
||||
def ensure_remote_repository(
|
||||
repo: Repository,
|
||||
repositories_base_dir: str,
|
||||
all_repos: List[Repository],
|
||||
preview: bool,
|
||||
) -> None:
|
||||
"""
|
||||
Backwards-compatible wrapper: ensure the *primary* remote repository
|
||||
derived from the primary URL.
|
||||
"""
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
|
||||
primary_url = determine_primary_remote_url(repo, ctx)
|
||||
if not primary_url:
|
||||
print("[INFO] No primary URL found; skipping remote provisioning.")
|
||||
return
|
||||
|
||||
ensure_remote_repository_for_url(
|
||||
url=primary_url,
|
||||
private_default=bool(repo.get("private", True)),
|
||||
description=str(repo.get("description", "")),
|
||||
preview=preview,
|
||||
)
|
||||
|
||||
@@ -2,11 +2,11 @@ from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
|
||||
from pkgmgr.core.git.queries import probe_remote_reachable
|
||||
from pkgmgr.core.git.queries import probe_remote_reachable_detail
|
||||
|
||||
from .context import build_context
|
||||
from .git_remote import ensure_origin_remote, determine_primary_remote_url
|
||||
from .remote_provision import ensure_remote_repository
|
||||
from .git_remote import determine_primary_remote_url, ensure_origin_remote
|
||||
from .remote_provision import ensure_remote_repository_for_url
|
||||
from .types import Repository
|
||||
|
||||
|
||||
@@ -25,6 +25,25 @@ def _is_git_remote_url(url: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _print_probe_result(name: str | None, url: str, *, cwd: str) -> None:
|
||||
"""
|
||||
Print probe result for a git remote URL, including a short failure reason.
|
||||
"""
|
||||
ok, reason = probe_remote_reachable_detail(url, cwd=cwd)
|
||||
|
||||
prefix = f"{name}: " if name else ""
|
||||
if ok:
|
||||
print(f"[OK] {prefix}{url}")
|
||||
return
|
||||
|
||||
print(f"[WARN] {prefix}{url}")
|
||||
if reason:
|
||||
reason = reason.strip()
|
||||
if len(reason) > 240:
|
||||
reason = reason[:240].rstrip() + "…"
|
||||
print(f" reason: {reason}")
|
||||
|
||||
|
||||
def _setup_local_mirrors_for_repo(
|
||||
repo: Repository,
|
||||
repositories_base_dir: str,
|
||||
@@ -56,35 +75,47 @@ def _setup_remote_mirrors_for_repo(
|
||||
print(f"[MIRROR SETUP:REMOTE] dir: {ctx.repo_dir}")
|
||||
print("------------------------------------------------------------")
|
||||
|
||||
if ensure_remote:
|
||||
ensure_remote_repository(
|
||||
repo,
|
||||
repositories_base_dir,
|
||||
all_repos,
|
||||
preview,
|
||||
)
|
||||
|
||||
# Probe only git URLs (do not try ls-remote against PyPI etc.)
|
||||
# If there are no mirrors at all, probe the primary git URL.
|
||||
git_mirrors = {
|
||||
k: v for k, v in ctx.resolved_mirrors.items() if _is_git_remote_url(v)
|
||||
}
|
||||
|
||||
# If there are no git mirrors, fall back to primary (git) URL.
|
||||
if not git_mirrors:
|
||||
primary = determine_primary_remote_url(repo, ctx)
|
||||
if not primary or not _is_git_remote_url(primary):
|
||||
print("[INFO] No git mirrors to probe.")
|
||||
print("[INFO] No git mirrors to probe or provision.")
|
||||
print()
|
||||
return
|
||||
|
||||
ok = probe_remote_reachable(primary, cwd=ctx.repo_dir)
|
||||
print("[OK]" if ok else "[WARN]", primary)
|
||||
if ensure_remote:
|
||||
print(f"[REMOTE ENSURE] ensuring primary: {primary}")
|
||||
ensure_remote_repository_for_url(
|
||||
url=primary,
|
||||
private_default=bool(repo.get("private", True)),
|
||||
description=str(repo.get("description", "")),
|
||||
preview=preview,
|
||||
)
|
||||
print()
|
||||
|
||||
_print_probe_result(None, primary, cwd=ctx.repo_dir)
|
||||
print()
|
||||
return
|
||||
|
||||
# Provision ALL git mirrors (if requested)
|
||||
if ensure_remote:
|
||||
for name, url in git_mirrors.items():
|
||||
print(f"[REMOTE ENSURE] ensuring mirror {name!r}: {url}")
|
||||
ensure_remote_repository_for_url(
|
||||
url=url,
|
||||
private_default=bool(repo.get("private", True)),
|
||||
description=str(repo.get("description", "")),
|
||||
preview=preview,
|
||||
)
|
||||
print()
|
||||
|
||||
# Probe ALL git mirrors
|
||||
for name, url in git_mirrors.items():
|
||||
ok = probe_remote_reachable(url, cwd=ctx.repo_dir)
|
||||
print(f"[OK] {name}: {url}" if ok else f"[WARN] {name}: {url}")
|
||||
_print_probe_result(name, url, cwd=ctx.repo_dir)
|
||||
|
||||
print()
|
||||
|
||||
|
||||
@@ -20,7 +20,10 @@ from .get_tags_at_ref import GitTagsAtRefQueryError, get_tags_at_ref
|
||||
from .get_upstream_ref import get_upstream_ref
|
||||
from .list_remotes import list_remotes
|
||||
from .list_tags import list_tags
|
||||
from .probe_remote_reachable import probe_remote_reachable
|
||||
from .probe_remote_reachable import (
|
||||
probe_remote_reachable,
|
||||
probe_remote_reachable_detail,
|
||||
)
|
||||
from .resolve_base_branch import GitBaseBranchNotFoundError, resolve_base_branch
|
||||
|
||||
__all__ = [
|
||||
@@ -37,6 +40,7 @@ __all__ = [
|
||||
"list_remotes",
|
||||
"get_remote_push_urls",
|
||||
"probe_remote_reachable",
|
||||
"probe_remote_reachable_detail",
|
||||
"get_changelog",
|
||||
"GitChangelogQueryError",
|
||||
"get_tags_at_ref",
|
||||
|
||||
@@ -1,21 +1,121 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Tuple
|
||||
|
||||
from ..errors import GitRunError
|
||||
from ..run import run
|
||||
|
||||
|
||||
def probe_remote_reachable(url: str, cwd: str = ".") -> bool:
|
||||
def _first_useful_line(text: str) -> str:
|
||||
lines: list[str] = []
|
||||
for line in (text or "").splitlines():
|
||||
s = line.strip()
|
||||
if s:
|
||||
lines.append(s)
|
||||
|
||||
if not lines:
|
||||
return ""
|
||||
|
||||
preferred_keywords = (
|
||||
"fatal:",
|
||||
"permission denied",
|
||||
"repository not found",
|
||||
"could not read from remote repository",
|
||||
"connection refused",
|
||||
"connection timed out",
|
||||
"no route to host",
|
||||
"name or service not known",
|
||||
"temporary failure in name resolution",
|
||||
"host key verification failed",
|
||||
"could not resolve hostname",
|
||||
"authentication failed",
|
||||
"publickey",
|
||||
"the authenticity of host",
|
||||
"known_hosts",
|
||||
)
|
||||
for s in lines:
|
||||
low = s.lower()
|
||||
if any(k in low for k in preferred_keywords):
|
||||
return s
|
||||
|
||||
# Avoid returning a meaningless "error:" if possible
|
||||
for s in lines:
|
||||
if s.lower() not in ("error:", "error"):
|
||||
return s
|
||||
|
||||
return lines[0]
|
||||
|
||||
|
||||
def _looks_like_real_transport_error(text: str) -> bool:
|
||||
"""
|
||||
Check whether a remote URL is reachable.
|
||||
True if stderr/stdout contains strong indicators that the remote is NOT usable.
|
||||
"""
|
||||
low = (text or "").lower()
|
||||
indicators = (
|
||||
"repository not found",
|
||||
"could not read from remote repository",
|
||||
"permission denied",
|
||||
"authentication failed",
|
||||
"publickey",
|
||||
"host key verification failed",
|
||||
"could not resolve hostname",
|
||||
"name or service not known",
|
||||
"connection refused",
|
||||
"connection timed out",
|
||||
"no route to host",
|
||||
)
|
||||
return any(i in low for i in indicators)
|
||||
|
||||
Equivalent to:
|
||||
git ls-remote --exit-code <url>
|
||||
|
||||
Returns:
|
||||
True if reachable, False otherwise.
|
||||
def _format_reason(exc: GitRunError, *, url: str) -> str:
|
||||
stderr = getattr(exc, "stderr", "") or ""
|
||||
stdout = getattr(exc, "stdout", "") or ""
|
||||
rc = getattr(exc, "returncode", None)
|
||||
|
||||
reason = (
|
||||
_first_useful_line(stderr)
|
||||
or _first_useful_line(stdout)
|
||||
or _first_useful_line(str(exc))
|
||||
)
|
||||
|
||||
if rc is not None:
|
||||
reason = f"(exit {rc}) {reason}".strip() if reason else f"(exit {rc})"
|
||||
|
||||
# If we still have nothing useful, provide a hint to debug SSH transport
|
||||
if not reason or reason.lower() in ("(exit 2)", "(exit 128)"):
|
||||
reason = (
|
||||
f"{reason} | hint: run "
|
||||
f"GIT_SSH_COMMAND='ssh -vvv' git ls-remote --exit-code {url!r}"
|
||||
).strip()
|
||||
|
||||
return reason.strip()
|
||||
|
||||
|
||||
def probe_remote_reachable_detail(url: str, cwd: str = ".") -> Tuple[bool, str]:
|
||||
"""
|
||||
Probe whether a remote URL is reachable.
|
||||
|
||||
Implementation detail:
|
||||
- We run `git ls-remote --exit-code <url>`.
|
||||
- Git may return exit code 2 when the remote is reachable but no refs exist
|
||||
(e.g. an empty repository). We treat that as reachable.
|
||||
"""
|
||||
try:
|
||||
run(["ls-remote", "--exit-code", url], cwd=cwd)
|
||||
return True
|
||||
except GitRunError:
|
||||
return False
|
||||
return True, ""
|
||||
except GitRunError as exc:
|
||||
rc = getattr(exc, "returncode", None)
|
||||
stderr = getattr(exc, "stderr", "") or ""
|
||||
stdout = getattr(exc, "stdout", "") or ""
|
||||
|
||||
# Important: `git ls-remote --exit-code` uses exit code 2 when no refs match.
|
||||
# For a completely empty repo, this can happen even though auth/transport is OK.
|
||||
if rc == 2 and not _looks_like_real_transport_error(stderr + "\n" + stdout):
|
||||
return True, "remote reachable, but no refs found yet (empty repository)"
|
||||
|
||||
return False, _format_reason(exc, url=url)
|
||||
|
||||
|
||||
def probe_remote_reachable(url: str, cwd: str = ".") -> bool:
|
||||
ok, _ = probe_remote_reachable_detail(url, cwd=cwd)
|
||||
return ok
|
||||
|
||||
@@ -42,16 +42,34 @@ def run(
|
||||
)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
stderr = exc.stderr or ""
|
||||
if _is_not_repo_error(stderr):
|
||||
raise GitNotRepositoryError(
|
||||
f"Not a git repository: {cwd!r}\nCommand: {cmd_str}\nSTDERR:\n{stderr}"
|
||||
) from exc
|
||||
stdout = exc.stdout or ""
|
||||
|
||||
raise GitRunError(
|
||||
if _is_not_repo_error(stderr):
|
||||
err = GitNotRepositoryError(
|
||||
f"Not a git repository: {cwd!r}\nCommand: {cmd_str}\nSTDERR:\n{stderr}"
|
||||
)
|
||||
# Attach details for callers who want to debug
|
||||
err.cwd = cwd
|
||||
err.cmd = cmd
|
||||
err.cmd_str = cmd_str
|
||||
err.returncode = exc.returncode
|
||||
err.stdout = stdout
|
||||
err.stderr = stderr
|
||||
raise err from exc
|
||||
|
||||
err = GitRunError(
|
||||
f"Git command failed in {cwd!r}: {cmd_str}\n"
|
||||
f"Exit code: {exc.returncode}\n"
|
||||
f"STDOUT:\n{exc.stdout}\n"
|
||||
f"STDOUT:\n{stdout}\n"
|
||||
f"STDERR:\n{stderr}"
|
||||
) from exc
|
||||
)
|
||||
# Attach details for callers who want to debug
|
||||
err.cwd = cwd
|
||||
err.cmd = cmd
|
||||
err.cmd_str = cmd_str
|
||||
err.returncode = exc.returncode
|
||||
err.stdout = stdout
|
||||
err.stderr = stderr
|
||||
raise err from exc
|
||||
|
||||
return result.stdout.strip()
|
||||
|
||||
Reference in New Issue
Block a user