Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f66af0157b | ||
|
|
b0b3ccf5aa | ||
|
|
e178afde31 | ||
|
|
9802293871 | ||
|
|
a2138c9985 | ||
|
|
10998e50ad |
8
.github/workflows/test-virgin-user.yml
vendored
8
.github/workflows/test-virgin-user.yml
vendored
@@ -49,11 +49,13 @@ jobs:
|
||||
chown -R dev:dev /nix
|
||||
chmod 0755 /nix
|
||||
chmod 1777 /nix/store
|
||||
|
||||
sudo -H -u dev env HOME=/home/dev PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 bash -lc "
|
||||
sudo -H -u dev env \
|
||||
HOME=/home/dev \
|
||||
NIX_CONFIG="$NIX_CONFIG" \
|
||||
PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 \
|
||||
bash -lc "
|
||||
set -euo pipefail
|
||||
cd /opt/src/pkgmgr
|
||||
|
||||
make setup-venv
|
||||
. \"\$HOME/.venvs/pkgmgr/bin/activate\"
|
||||
|
||||
|
||||
12
CHANGELOG.md
12
CHANGELOG.md
@@ -1,3 +1,15 @@
|
||||
## [1.9.1] - 2025-12-21
|
||||
|
||||
* Fixed installation issues and improved loading of default configuration files.
|
||||
|
||||
|
||||
## [1.9.0] - 2025-12-20
|
||||
|
||||
* * New ***mirror visibility*** command to set remote Git repositories to ***public*** or ***private***.
|
||||
* New ***--public*** flag for ***mirror provision*** to create repositories and immediately make them public.
|
||||
* All configured git mirrors are now provisioned.
|
||||
|
||||
|
||||
## [1.8.7] - 2025-12-19
|
||||
|
||||
* * **Release version updates now correctly modify ***pyproject.toml*** files that follow PEP 621**, ensuring the ***[project].version*** field is updated as expected.
|
||||
|
||||
@@ -32,7 +32,7 @@
|
||||
rec {
|
||||
pkgmgr = pyPkgs.buildPythonApplication {
|
||||
pname = "package-manager";
|
||||
version = "1.8.7";
|
||||
version = "1.9.1";
|
||||
|
||||
# Use the git repo as source
|
||||
src = ./.;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Maintainer: Kevin Veen-Birkenbach <info@veen.world>
|
||||
|
||||
pkgname=package-manager
|
||||
pkgver=1.8.7
|
||||
pkgver=1.9.1
|
||||
pkgrel=1
|
||||
pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)."
|
||||
arch=('any')
|
||||
|
||||
@@ -1,3 +1,17 @@
|
||||
package-manager (1.9.1-1) unstable; urgency=medium
|
||||
|
||||
* Fixed installation issues and improved loading of default configuration files.
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Sun, 21 Dec 2025 13:38:58 +0100
|
||||
|
||||
package-manager (1.9.0-1) unstable; urgency=medium
|
||||
|
||||
* * New ***mirror visibility*** command to set remote Git repositories to ***public*** or ***private***.
|
||||
* New ***--public*** flag for ***mirror provision*** to create repositories and immediately make them public.
|
||||
* All configured git mirrors are now provisioned.
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Sat, 20 Dec 2025 14:37:58 +0100
|
||||
|
||||
package-manager (1.8.7-1) unstable; urgency=medium
|
||||
|
||||
* * **Release version updates now correctly modify ***pyproject.toml*** files that follow PEP 621**, ensuring the ***[project].version*** field is updated as expected.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
Name: package-manager
|
||||
Version: 1.8.7
|
||||
Version: 1.9.1
|
||||
Release: 1%{?dist}
|
||||
Summary: Wrapper that runs Kevin's package-manager via Nix flake
|
||||
|
||||
@@ -74,6 +74,14 @@ echo ">>> package-manager removed. Nix itself was not removed."
|
||||
/usr/lib/package-manager/
|
||||
|
||||
%changelog
|
||||
* Sun Dec 21 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.9.1-1
|
||||
- Fixed installation issues and improved loading of default configuration files.
|
||||
|
||||
* Sat Dec 20 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.9.0-1
|
||||
- * New ***mirror visibility*** command to set remote Git repositories to ***public*** or ***private***.
|
||||
* New ***--public*** flag for ***mirror provision*** to create repositories and immediately make them public.
|
||||
* All configured git mirrors are now provisioned.
|
||||
|
||||
* Fri Dec 19 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.8.7-1
|
||||
- * **Release version updates now correctly modify ***pyproject.toml*** files that follow PEP 621**, ensuring the ***[project].version*** field is updated as expected.
|
||||
* **Invalid or incomplete ***pyproject.toml*** files are now handled gracefully** with clear error messages instead of abrupt process termination.
|
||||
|
||||
@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "kpmx"
|
||||
version = "1.8.7"
|
||||
version = "1.9.1"
|
||||
description = "Kevin's package-manager tool (pkgmgr)"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.9"
|
||||
@@ -43,11 +43,11 @@ pkgmgr = "pkgmgr.cli:main"
|
||||
# -----------------------------
|
||||
# Source layout: all packages live under "src/"
|
||||
[tool.setuptools]
|
||||
package-dir = { "" = "src", "config" = "config" }
|
||||
package-dir = { "" = "src" }
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src", "."]
|
||||
include = ["pkgmgr*", "config*"]
|
||||
where = ["src"]
|
||||
include = ["pkgmgr*"]
|
||||
|
||||
[tool.setuptools.package-data]
|
||||
"config" = ["defaults.yaml"]
|
||||
|
||||
@@ -14,6 +14,7 @@ from .list_cmd import list_mirrors
|
||||
from .diff_cmd import diff_mirrors
|
||||
from .merge_cmd import merge_mirrors
|
||||
from .setup_cmd import setup_mirrors
|
||||
from .visibility_cmd import set_mirror_visibility
|
||||
|
||||
__all__ = [
|
||||
"Repository",
|
||||
@@ -22,4 +23,5 @@ __all__ = [
|
||||
"diff_mirrors",
|
||||
"merge_mirrors",
|
||||
"setup_mirrors",
|
||||
"set_mirror_visibility",
|
||||
]
|
||||
|
||||
@@ -11,35 +11,37 @@ from .types import Repository
|
||||
from .url_utils import normalize_provider_host, parse_repo_from_git_url
|
||||
|
||||
|
||||
def ensure_remote_repository(
|
||||
repo: Repository,
|
||||
repositories_base_dir: str,
|
||||
all_repos: List[Repository],
|
||||
def _provider_hint_from_host(host: str) -> str | None:
|
||||
h = (host or "").lower()
|
||||
if h == "github.com":
|
||||
return "github"
|
||||
# Best-effort default for self-hosted git domains
|
||||
return "gitea" if h else None
|
||||
|
||||
|
||||
def ensure_remote_repository_for_url(
|
||||
*,
|
||||
url: str,
|
||||
private_default: bool,
|
||||
description: str,
|
||||
preview: bool,
|
||||
) -> None:
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
|
||||
primary_url = determine_primary_remote_url(repo, ctx)
|
||||
if not primary_url:
|
||||
print("[INFO] No primary URL found; skipping remote provisioning.")
|
||||
return
|
||||
|
||||
host_raw, owner, name = parse_repo_from_git_url(primary_url)
|
||||
host_raw, owner, name = parse_repo_from_git_url(url)
|
||||
host = normalize_provider_host(host_raw)
|
||||
|
||||
if not host or not owner or not name:
|
||||
print("[WARN] Could not parse remote URL:", primary_url)
|
||||
print(f"[WARN] Could not parse repo from URL: {url}")
|
||||
return
|
||||
|
||||
spec = RepoSpec(
|
||||
host=host,
|
||||
owner=owner,
|
||||
name=name,
|
||||
private=bool(repo.get("private", True)),
|
||||
description=str(repo.get("description", "")),
|
||||
private=private_default,
|
||||
description=description,
|
||||
)
|
||||
|
||||
provider_kind = str(repo.get("provider", "")).lower() or None
|
||||
provider_kind = _provider_hint_from_host(host)
|
||||
|
||||
try:
|
||||
result = ensure_remote_repo(
|
||||
@@ -56,4 +58,29 @@ def ensure_remote_repository(
|
||||
if result.url:
|
||||
print(f"[REMOTE ENSURE] URL: {result.url}")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
print(f"[ERROR] Remote provisioning failed: {exc}")
|
||||
print(f"[ERROR] Remote provisioning failed for {url!r}: {exc}")
|
||||
|
||||
|
||||
def ensure_remote_repository(
|
||||
repo: Repository,
|
||||
repositories_base_dir: str,
|
||||
all_repos: List[Repository],
|
||||
preview: bool,
|
||||
) -> None:
|
||||
"""
|
||||
Backwards-compatible wrapper: ensure the *primary* remote repository
|
||||
derived from the primary URL.
|
||||
"""
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
|
||||
primary_url = determine_primary_remote_url(repo, ctx)
|
||||
if not primary_url:
|
||||
print("[INFO] No primary URL found; skipping remote provisioning.")
|
||||
return
|
||||
|
||||
ensure_remote_repository_for_url(
|
||||
url=primary_url,
|
||||
private_default=bool(repo.get("private", True)),
|
||||
description=str(repo.get("description", "")),
|
||||
preview=preview,
|
||||
)
|
||||
|
||||
@@ -2,12 +2,15 @@ from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
|
||||
from pkgmgr.core.git.queries import probe_remote_reachable
|
||||
from pkgmgr.core.git.queries import probe_remote_reachable_detail
|
||||
from pkgmgr.core.remote_provisioning import ProviderHint, RepoSpec, set_repo_visibility
|
||||
from pkgmgr.core.remote_provisioning.visibility import VisibilityOptions
|
||||
|
||||
from .context import build_context
|
||||
from .git_remote import ensure_origin_remote, determine_primary_remote_url
|
||||
from .remote_provision import ensure_remote_repository
|
||||
from .git_remote import determine_primary_remote_url, ensure_origin_remote
|
||||
from .remote_provision import ensure_remote_repository_for_url
|
||||
from .types import Repository
|
||||
from .url_utils import normalize_provider_host, parse_repo_from_git_url
|
||||
|
||||
|
||||
def _is_git_remote_url(url: str) -> bool:
|
||||
@@ -25,6 +28,64 @@ def _is_git_remote_url(url: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _provider_hint_from_host(host: str) -> str | None:
|
||||
h = (host or "").lower()
|
||||
if h == "github.com":
|
||||
return "github"
|
||||
return "gitea" if h else None
|
||||
|
||||
|
||||
def _apply_visibility_for_url(
|
||||
*,
|
||||
url: str,
|
||||
private: bool,
|
||||
description: str,
|
||||
preview: bool,
|
||||
) -> None:
|
||||
host_raw, owner, name = parse_repo_from_git_url(url)
|
||||
host = normalize_provider_host(host_raw)
|
||||
|
||||
if not host or not owner or not name:
|
||||
print(f"[WARN] Could not parse repo from URL: {url}")
|
||||
return
|
||||
|
||||
spec = RepoSpec(
|
||||
host=host,
|
||||
owner=owner,
|
||||
name=name,
|
||||
private=private,
|
||||
description=description,
|
||||
)
|
||||
|
||||
provider_kind = _provider_hint_from_host(host)
|
||||
res = set_repo_visibility(
|
||||
spec,
|
||||
private=private,
|
||||
provider_hint=ProviderHint(kind=provider_kind),
|
||||
options=VisibilityOptions(preview=preview),
|
||||
)
|
||||
print(f"[REMOTE VISIBILITY] {res.status.upper()}: {res.message}")
|
||||
|
||||
|
||||
def _print_probe_result(name: str | None, url: str, *, cwd: str) -> None:
|
||||
"""
|
||||
Print probe result for a git remote URL, including a short failure reason.
|
||||
"""
|
||||
ok, reason = probe_remote_reachable_detail(url, cwd=cwd)
|
||||
|
||||
prefix = f"{name}: " if name else ""
|
||||
if ok:
|
||||
print(f"[OK] {prefix}{url}")
|
||||
return
|
||||
|
||||
print(f"[WARN] {prefix}{url}")
|
||||
if reason:
|
||||
reason = reason.strip()
|
||||
if len(reason) > 240:
|
||||
reason = reason[:240].rstrip() + "…"
|
||||
print(f" reason: {reason}")
|
||||
|
||||
|
||||
def _setup_local_mirrors_for_repo(
|
||||
repo: Repository,
|
||||
repositories_base_dir: str,
|
||||
@@ -48,6 +109,7 @@ def _setup_remote_mirrors_for_repo(
|
||||
all_repos: List[Repository],
|
||||
preview: bool,
|
||||
ensure_remote: bool,
|
||||
ensure_visibility: str | None,
|
||||
) -> None:
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
|
||||
@@ -56,35 +118,78 @@ def _setup_remote_mirrors_for_repo(
|
||||
print(f"[MIRROR SETUP:REMOTE] dir: {ctx.repo_dir}")
|
||||
print("------------------------------------------------------------")
|
||||
|
||||
if ensure_remote:
|
||||
ensure_remote_repository(
|
||||
repo,
|
||||
repositories_base_dir,
|
||||
all_repos,
|
||||
preview,
|
||||
)
|
||||
|
||||
# Probe only git URLs (do not try ls-remote against PyPI etc.)
|
||||
# If there are no mirrors at all, probe the primary git URL.
|
||||
git_mirrors = {
|
||||
k: v for k, v in ctx.resolved_mirrors.items() if _is_git_remote_url(v)
|
||||
}
|
||||
|
||||
def _desired_private_default() -> bool:
|
||||
# default behavior: repo['private'] (or True)
|
||||
if ensure_visibility == "public":
|
||||
return False
|
||||
if ensure_visibility == "private":
|
||||
return True
|
||||
return bool(repo.get("private", True))
|
||||
|
||||
def _should_enforce_visibility() -> bool:
|
||||
return ensure_visibility in ("public", "private")
|
||||
|
||||
def _visibility_private_value() -> bool:
|
||||
return ensure_visibility == "private"
|
||||
|
||||
description = str(repo.get("description", ""))
|
||||
|
||||
# If there are no git mirrors, fall back to primary (git) URL.
|
||||
if not git_mirrors:
|
||||
primary = determine_primary_remote_url(repo, ctx)
|
||||
if not primary or not _is_git_remote_url(primary):
|
||||
print("[INFO] No git mirrors to probe.")
|
||||
print("[INFO] No git mirrors to probe or provision.")
|
||||
print()
|
||||
return
|
||||
|
||||
ok = probe_remote_reachable(primary, cwd=ctx.repo_dir)
|
||||
print("[OK]" if ok else "[WARN]", primary)
|
||||
if ensure_remote:
|
||||
print(f"[REMOTE ENSURE] ensuring primary: {primary}")
|
||||
ensure_remote_repository_for_url(
|
||||
url=primary,
|
||||
private_default=_desired_private_default(),
|
||||
description=description,
|
||||
preview=preview,
|
||||
)
|
||||
# IMPORTANT: enforce visibility only if requested
|
||||
if _should_enforce_visibility():
|
||||
_apply_visibility_for_url(
|
||||
url=primary,
|
||||
private=_visibility_private_value(),
|
||||
description=description,
|
||||
preview=preview,
|
||||
)
|
||||
print()
|
||||
|
||||
_print_probe_result(None, primary, cwd=ctx.repo_dir)
|
||||
print()
|
||||
return
|
||||
|
||||
# Provision ALL git mirrors (if requested)
|
||||
if ensure_remote:
|
||||
for name, url in git_mirrors.items():
|
||||
ok = probe_remote_reachable(url, cwd=ctx.repo_dir)
|
||||
print(f"[OK] {name}: {url}" if ok else f"[WARN] {name}: {url}")
|
||||
print(f"[REMOTE ENSURE] ensuring mirror {name!r}: {url}")
|
||||
ensure_remote_repository_for_url(
|
||||
url=url,
|
||||
private_default=_desired_private_default(),
|
||||
description=description,
|
||||
preview=preview,
|
||||
)
|
||||
if _should_enforce_visibility():
|
||||
_apply_visibility_for_url(
|
||||
url=url,
|
||||
private=_visibility_private_value(),
|
||||
description=description,
|
||||
preview=preview,
|
||||
)
|
||||
print()
|
||||
|
||||
# Probe ALL git mirrors
|
||||
for name, url in git_mirrors.items():
|
||||
_print_probe_result(name, url, cwd=ctx.repo_dir)
|
||||
|
||||
print()
|
||||
|
||||
@@ -97,6 +202,7 @@ def setup_mirrors(
|
||||
local: bool = True,
|
||||
remote: bool = True,
|
||||
ensure_remote: bool = False,
|
||||
ensure_visibility: str | None = None,
|
||||
) -> None:
|
||||
for repo in selected_repos:
|
||||
if local:
|
||||
@@ -114,4 +220,5 @@ def setup_mirrors(
|
||||
all_repos,
|
||||
preview,
|
||||
ensure_remote,
|
||||
ensure_visibility,
|
||||
)
|
||||
|
||||
134
src/pkgmgr/actions/mirror/visibility_cmd.py
Normal file
134
src/pkgmgr/actions/mirror/visibility_cmd.py
Normal file
@@ -0,0 +1,134 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
|
||||
from pkgmgr.core.remote_provisioning import ProviderHint, RepoSpec, set_repo_visibility
|
||||
from pkgmgr.core.remote_provisioning.visibility import VisibilityOptions
|
||||
|
||||
from .context import build_context
|
||||
from .git_remote import determine_primary_remote_url
|
||||
from .types import Repository
|
||||
from .url_utils import normalize_provider_host, parse_repo_from_git_url
|
||||
|
||||
|
||||
def _is_git_remote_url(url: str) -> bool:
|
||||
# Keep same semantics as setup_cmd.py / git_remote.py
|
||||
u = (url or "").strip()
|
||||
if not u:
|
||||
return False
|
||||
if u.startswith("git@"):
|
||||
return True
|
||||
if u.startswith("ssh://"):
|
||||
return True
|
||||
if (u.startswith("https://") or u.startswith("http://")) and u.endswith(".git"):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _provider_hint_from_host(host: str) -> str | None:
|
||||
h = (host or "").lower()
|
||||
if h == "github.com":
|
||||
return "github"
|
||||
# Best-effort default for self-hosted git domains
|
||||
return "gitea" if h else None
|
||||
|
||||
|
||||
def _apply_visibility_for_url(
|
||||
*,
|
||||
url: str,
|
||||
private: bool,
|
||||
description: str,
|
||||
preview: bool,
|
||||
) -> None:
|
||||
host_raw, owner, name = parse_repo_from_git_url(url)
|
||||
host = normalize_provider_host(host_raw)
|
||||
|
||||
if not host or not owner or not name:
|
||||
print(f"[WARN] Could not parse repo from URL: {url}")
|
||||
return
|
||||
|
||||
spec = RepoSpec(
|
||||
host=host,
|
||||
owner=owner,
|
||||
name=name,
|
||||
private=private,
|
||||
description=description,
|
||||
)
|
||||
|
||||
provider_kind = _provider_hint_from_host(host)
|
||||
res = set_repo_visibility(
|
||||
spec,
|
||||
private=private,
|
||||
provider_hint=ProviderHint(kind=provider_kind),
|
||||
options=VisibilityOptions(preview=preview),
|
||||
)
|
||||
print(f"[REMOTE VISIBILITY] {res.status.upper()}: {res.message}")
|
||||
|
||||
|
||||
def set_mirror_visibility(
|
||||
selected_repos: List[Repository],
|
||||
repositories_base_dir: str,
|
||||
all_repos: List[Repository],
|
||||
*,
|
||||
visibility: str,
|
||||
preview: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Set remote repository visibility for all git mirrors of each selected repo.
|
||||
|
||||
visibility:
|
||||
- "private"
|
||||
- "public"
|
||||
"""
|
||||
v = (visibility or "").strip().lower()
|
||||
if v not in ("private", "public"):
|
||||
raise ValueError("visibility must be 'private' or 'public'")
|
||||
|
||||
desired_private = v == "private"
|
||||
|
||||
for repo in selected_repos:
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
|
||||
print("------------------------------------------------------------")
|
||||
print(f"[MIRROR VISIBILITY] {ctx.identifier}")
|
||||
print(f"[MIRROR VISIBILITY] dir: {ctx.repo_dir}")
|
||||
print(f"[MIRROR VISIBILITY] target: {v}")
|
||||
print("------------------------------------------------------------")
|
||||
|
||||
git_mirrors = {
|
||||
name: url
|
||||
for name, url in ctx.resolved_mirrors.items()
|
||||
if url and _is_git_remote_url(url)
|
||||
}
|
||||
|
||||
# If there are no git mirrors, fall back to primary (git) URL.
|
||||
if not git_mirrors:
|
||||
primary = determine_primary_remote_url(repo, ctx)
|
||||
if not primary or not _is_git_remote_url(primary):
|
||||
print(
|
||||
"[INFO] No git mirrors found (and no primary git URL). Nothing to do."
|
||||
)
|
||||
print()
|
||||
continue
|
||||
|
||||
print(f"[MIRROR VISIBILITY] applying to primary: {primary}")
|
||||
_apply_visibility_for_url(
|
||||
url=primary,
|
||||
private=desired_private,
|
||||
description=str(repo.get("description", "")),
|
||||
preview=preview,
|
||||
)
|
||||
print()
|
||||
continue
|
||||
|
||||
# Apply to ALL git mirrors
|
||||
for name, url in git_mirrors.items():
|
||||
print(f"[MIRROR VISIBILITY] applying to mirror {name!r}: {url}")
|
||||
_apply_visibility_for_url(
|
||||
url=url,
|
||||
private=desired_private,
|
||||
description=str(repo.get("description", "")),
|
||||
preview=preview,
|
||||
)
|
||||
|
||||
print()
|
||||
@@ -42,9 +42,7 @@ def _find_defaults_source_dir() -> Optional[str]:
|
||||
project root that contains default config files.
|
||||
|
||||
Preferred locations (in dieser Reihenfolge):
|
||||
- <pkg_root>/config_defaults
|
||||
- <pkg_root>/config
|
||||
- <project_root>/config_defaults
|
||||
- <project_root>/config
|
||||
"""
|
||||
import pkgmgr # local import to avoid circular deps
|
||||
@@ -53,9 +51,7 @@ def _find_defaults_source_dir() -> Optional[str]:
|
||||
project_root = pkg_root.parent
|
||||
|
||||
candidates = [
|
||||
pkg_root / "config_defaults",
|
||||
pkg_root / "config",
|
||||
project_root / "config_defaults",
|
||||
project_root / "config",
|
||||
]
|
||||
for cand in candidates:
|
||||
@@ -73,7 +69,7 @@ def _update_default_configs(user_config_path: str) -> None:
|
||||
source_dir = _find_defaults_source_dir()
|
||||
if not source_dir:
|
||||
print(
|
||||
"[WARN] No config_defaults or config directory found in "
|
||||
"[WARN] No config directory found in "
|
||||
"pkgmgr installation. Nothing to update."
|
||||
)
|
||||
return
|
||||
|
||||
@@ -8,6 +8,7 @@ from pkgmgr.actions.mirror import (
|
||||
diff_mirrors,
|
||||
list_mirrors,
|
||||
merge_mirrors,
|
||||
set_mirror_visibility,
|
||||
setup_mirrors,
|
||||
)
|
||||
from pkgmgr.cli.context import CLIContext
|
||||
@@ -30,6 +31,7 @@ def handle_mirror_command(
|
||||
- mirror setup
|
||||
- mirror check
|
||||
- mirror provision
|
||||
- mirror visibility
|
||||
"""
|
||||
if not selected:
|
||||
print("[INFO] No repositories selected for 'mirror' command.")
|
||||
@@ -92,6 +94,7 @@ def handle_mirror_command(
|
||||
local=True,
|
||||
remote=False,
|
||||
ensure_remote=False,
|
||||
ensure_visibility=None,
|
||||
)
|
||||
return
|
||||
|
||||
@@ -105,11 +108,14 @@ def handle_mirror_command(
|
||||
local=False,
|
||||
remote=True,
|
||||
ensure_remote=False,
|
||||
ensure_visibility=None,
|
||||
)
|
||||
return
|
||||
|
||||
if subcommand == "provision":
|
||||
preview = getattr(args, "preview", False)
|
||||
public = bool(getattr(args, "public", False))
|
||||
|
||||
setup_mirrors(
|
||||
selected_repos=selected,
|
||||
repositories_base_dir=ctx.repositories_base_dir,
|
||||
@@ -118,6 +124,23 @@ def handle_mirror_command(
|
||||
local=False,
|
||||
remote=True,
|
||||
ensure_remote=True,
|
||||
ensure_visibility="public" if public else None,
|
||||
)
|
||||
return
|
||||
|
||||
if subcommand == "visibility":
|
||||
preview = getattr(args, "preview", False)
|
||||
visibility = getattr(args, "visibility", None)
|
||||
if visibility not in ("private", "public"):
|
||||
print("[ERROR] mirror visibility expects 'private' or 'public'.")
|
||||
sys.exit(2)
|
||||
|
||||
set_mirror_visibility(
|
||||
selected_repos=selected,
|
||||
repositories_base_dir=ctx.repositories_base_dir,
|
||||
all_repos=ctx.all_repositories,
|
||||
visibility=visibility,
|
||||
preview=preview,
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# src/pkgmgr/cli/parser/mirror_cmd.py
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
@@ -12,7 +11,7 @@ from .common import add_identifier_arguments
|
||||
def add_mirror_subparsers(subparsers: argparse._SubParsersAction) -> None:
|
||||
mirror_parser = subparsers.add_parser(
|
||||
"mirror",
|
||||
help="Mirror-related utilities (list, diff, merge, setup, check, provision)",
|
||||
help="Mirror-related utilities (list, diff, merge, setup, check, provision, visibility)",
|
||||
)
|
||||
mirror_subparsers = mirror_parser.add_subparsers(
|
||||
dest="subcommand",
|
||||
@@ -68,4 +67,20 @@ def add_mirror_subparsers(subparsers: argparse._SubParsersAction) -> None:
|
||||
"provision",
|
||||
help="Provision remote repositories via provider APIs (create missing repos).",
|
||||
)
|
||||
mirror_provision.add_argument(
|
||||
"--public",
|
||||
action="store_true",
|
||||
help="After ensuring repos exist, enforce public visibility on the remote provider.",
|
||||
)
|
||||
add_identifier_arguments(mirror_provision)
|
||||
|
||||
mirror_visibility = mirror_subparsers.add_parser(
|
||||
"visibility",
|
||||
help="Set visibility (public/private) for all remote git mirrors via provider APIs.",
|
||||
)
|
||||
mirror_visibility.add_argument(
|
||||
"visibility",
|
||||
choices=["private", "public"],
|
||||
help="Target visibility for all git mirrors.",
|
||||
)
|
||||
add_identifier_arguments(mirror_visibility)
|
||||
|
||||
@@ -14,9 +14,7 @@ Layering rules:
|
||||
- Falls dort keine passenden Dateien existieren, wird auf die im
|
||||
Paket / Projekt mitgelieferten Config-Verzeichnisse zurückgegriffen:
|
||||
|
||||
<pkg_root>/config_defaults
|
||||
<pkg_root>/config
|
||||
<project_root>/config_defaults
|
||||
<project_root>/config
|
||||
|
||||
Dabei werden ebenfalls alle *.yml/*.yaml als Layer geladen.
|
||||
@@ -218,7 +216,6 @@ def _load_defaults_from_package_or_project() -> Dict[str, Any]:
|
||||
# Candidate config dirs
|
||||
candidates = []
|
||||
for root in roots:
|
||||
candidates.append(root / "config_defaults")
|
||||
candidates.append(root / "config")
|
||||
|
||||
for cand in candidates:
|
||||
|
||||
@@ -20,7 +20,10 @@ from .get_tags_at_ref import GitTagsAtRefQueryError, get_tags_at_ref
|
||||
from .get_upstream_ref import get_upstream_ref
|
||||
from .list_remotes import list_remotes
|
||||
from .list_tags import list_tags
|
||||
from .probe_remote_reachable import probe_remote_reachable
|
||||
from .probe_remote_reachable import (
|
||||
probe_remote_reachable,
|
||||
probe_remote_reachable_detail,
|
||||
)
|
||||
from .resolve_base_branch import GitBaseBranchNotFoundError, resolve_base_branch
|
||||
|
||||
__all__ = [
|
||||
@@ -37,6 +40,7 @@ __all__ = [
|
||||
"list_remotes",
|
||||
"get_remote_push_urls",
|
||||
"probe_remote_reachable",
|
||||
"probe_remote_reachable_detail",
|
||||
"get_changelog",
|
||||
"GitChangelogQueryError",
|
||||
"get_tags_at_ref",
|
||||
|
||||
@@ -1,21 +1,121 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Tuple
|
||||
|
||||
from ..errors import GitRunError
|
||||
from ..run import run
|
||||
|
||||
|
||||
def probe_remote_reachable(url: str, cwd: str = ".") -> bool:
|
||||
def _first_useful_line(text: str) -> str:
|
||||
lines: list[str] = []
|
||||
for line in (text or "").splitlines():
|
||||
s = line.strip()
|
||||
if s:
|
||||
lines.append(s)
|
||||
|
||||
if not lines:
|
||||
return ""
|
||||
|
||||
preferred_keywords = (
|
||||
"fatal:",
|
||||
"permission denied",
|
||||
"repository not found",
|
||||
"could not read from remote repository",
|
||||
"connection refused",
|
||||
"connection timed out",
|
||||
"no route to host",
|
||||
"name or service not known",
|
||||
"temporary failure in name resolution",
|
||||
"host key verification failed",
|
||||
"could not resolve hostname",
|
||||
"authentication failed",
|
||||
"publickey",
|
||||
"the authenticity of host",
|
||||
"known_hosts",
|
||||
)
|
||||
for s in lines:
|
||||
low = s.lower()
|
||||
if any(k in low for k in preferred_keywords):
|
||||
return s
|
||||
|
||||
# Avoid returning a meaningless "error:" if possible
|
||||
for s in lines:
|
||||
if s.lower() not in ("error:", "error"):
|
||||
return s
|
||||
|
||||
return lines[0]
|
||||
|
||||
|
||||
def _looks_like_real_transport_error(text: str) -> bool:
|
||||
"""
|
||||
Check whether a remote URL is reachable.
|
||||
True if stderr/stdout contains strong indicators that the remote is NOT usable.
|
||||
"""
|
||||
low = (text or "").lower()
|
||||
indicators = (
|
||||
"repository not found",
|
||||
"could not read from remote repository",
|
||||
"permission denied",
|
||||
"authentication failed",
|
||||
"publickey",
|
||||
"host key verification failed",
|
||||
"could not resolve hostname",
|
||||
"name or service not known",
|
||||
"connection refused",
|
||||
"connection timed out",
|
||||
"no route to host",
|
||||
)
|
||||
return any(i in low for i in indicators)
|
||||
|
||||
Equivalent to:
|
||||
git ls-remote --exit-code <url>
|
||||
|
||||
Returns:
|
||||
True if reachable, False otherwise.
|
||||
def _format_reason(exc: GitRunError, *, url: str) -> str:
|
||||
stderr = getattr(exc, "stderr", "") or ""
|
||||
stdout = getattr(exc, "stdout", "") or ""
|
||||
rc = getattr(exc, "returncode", None)
|
||||
|
||||
reason = (
|
||||
_first_useful_line(stderr)
|
||||
or _first_useful_line(stdout)
|
||||
or _first_useful_line(str(exc))
|
||||
)
|
||||
|
||||
if rc is not None:
|
||||
reason = f"(exit {rc}) {reason}".strip() if reason else f"(exit {rc})"
|
||||
|
||||
# If we still have nothing useful, provide a hint to debug SSH transport
|
||||
if not reason or reason.lower() in ("(exit 2)", "(exit 128)"):
|
||||
reason = (
|
||||
f"{reason} | hint: run "
|
||||
f"GIT_SSH_COMMAND='ssh -vvv' git ls-remote --exit-code {url!r}"
|
||||
).strip()
|
||||
|
||||
return reason.strip()
|
||||
|
||||
|
||||
def probe_remote_reachable_detail(url: str, cwd: str = ".") -> Tuple[bool, str]:
|
||||
"""
|
||||
Probe whether a remote URL is reachable.
|
||||
|
||||
Implementation detail:
|
||||
- We run `git ls-remote --exit-code <url>`.
|
||||
- Git may return exit code 2 when the remote is reachable but no refs exist
|
||||
(e.g. an empty repository). We treat that as reachable.
|
||||
"""
|
||||
try:
|
||||
run(["ls-remote", "--exit-code", url], cwd=cwd)
|
||||
return True
|
||||
except GitRunError:
|
||||
return False
|
||||
return True, ""
|
||||
except GitRunError as exc:
|
||||
rc = getattr(exc, "returncode", None)
|
||||
stderr = getattr(exc, "stderr", "") or ""
|
||||
stdout = getattr(exc, "stdout", "") or ""
|
||||
|
||||
# Important: `git ls-remote --exit-code` uses exit code 2 when no refs match.
|
||||
# For a completely empty repo, this can happen even though auth/transport is OK.
|
||||
if rc == 2 and not _looks_like_real_transport_error(stderr + "\n" + stdout):
|
||||
return True, "remote reachable, but no refs found yet (empty repository)"
|
||||
|
||||
return False, _format_reason(exc, url=url)
|
||||
|
||||
|
||||
def probe_remote_reachable(url: str, cwd: str = ".") -> bool:
|
||||
ok, _ = probe_remote_reachable_detail(url, cwd=cwd)
|
||||
return ok
|
||||
|
||||
@@ -42,16 +42,34 @@ def run(
|
||||
)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
stderr = exc.stderr or ""
|
||||
if _is_not_repo_error(stderr):
|
||||
raise GitNotRepositoryError(
|
||||
f"Not a git repository: {cwd!r}\nCommand: {cmd_str}\nSTDERR:\n{stderr}"
|
||||
) from exc
|
||||
stdout = exc.stdout or ""
|
||||
|
||||
raise GitRunError(
|
||||
if _is_not_repo_error(stderr):
|
||||
err = GitNotRepositoryError(
|
||||
f"Not a git repository: {cwd!r}\nCommand: {cmd_str}\nSTDERR:\n{stderr}"
|
||||
)
|
||||
# Attach details for callers who want to debug
|
||||
err.cwd = cwd
|
||||
err.cmd = cmd
|
||||
err.cmd_str = cmd_str
|
||||
err.returncode = exc.returncode
|
||||
err.stdout = stdout
|
||||
err.stderr = stderr
|
||||
raise err from exc
|
||||
|
||||
err = GitRunError(
|
||||
f"Git command failed in {cwd!r}: {cmd_str}\n"
|
||||
f"Exit code: {exc.returncode}\n"
|
||||
f"STDOUT:\n{exc.stdout}\n"
|
||||
f"STDOUT:\n{stdout}\n"
|
||||
f"STDERR:\n{stderr}"
|
||||
) from exc
|
||||
)
|
||||
# Attach details for callers who want to debug
|
||||
err.cwd = cwd
|
||||
err.cmd = cmd
|
||||
err.cmd_str = cmd_str
|
||||
err.returncode = exc.returncode
|
||||
err.stdout = stdout
|
||||
err.stderr = stderr
|
||||
raise err from exc
|
||||
|
||||
return result.stdout.strip()
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
# src/pkgmgr/core/remote_provisioning/__init__.py
|
||||
"""Remote repository provisioning (ensure remote repo exists)."""
|
||||
|
||||
from .ensure import ensure_remote_repo
|
||||
from .registry import ProviderRegistry
|
||||
from .types import EnsureResult, ProviderHint, RepoSpec
|
||||
from .visibility import set_repo_visibility
|
||||
|
||||
__all__ = [
|
||||
"ensure_remote_repo",
|
||||
"set_repo_visibility",
|
||||
"RepoSpec",
|
||||
"EnsureResult",
|
||||
"ProviderHint",
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# src/pkgmgr/core/remote_provisioning/providers/base.py
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
@@ -23,7 +22,26 @@ class RemoteProvider(ABC):
|
||||
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
|
||||
"""Create a repository (owner may be user or org)."""
|
||||
|
||||
@abstractmethod
|
||||
def get_repo_private(self, token: str, spec: RepoSpec) -> bool | None:
|
||||
"""
|
||||
Return current repo privacy, or None if repo not found / inaccessible.
|
||||
|
||||
IMPORTANT:
|
||||
- Must NOT create repositories.
|
||||
- Should return None on 404 (not found) or when the repo cannot be accessed.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def set_repo_private(self, token: str, spec: RepoSpec, *, private: bool) -> None:
|
||||
"""
|
||||
Update repo privacy (PATCH). Must NOT create repositories.
|
||||
|
||||
Implementations should raise HttpError on API failure.
|
||||
"""
|
||||
|
||||
def ensure_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
|
||||
"""Ensure repository exists (create if missing)."""
|
||||
if self.repo_exists(token, spec):
|
||||
return EnsureResult(status="exists", message="Repository exists.")
|
||||
return self.create_repo(token, spec)
|
||||
|
||||
@@ -52,6 +52,39 @@ class GiteaProvider(RemoteProvider):
|
||||
return False
|
||||
raise
|
||||
|
||||
def get_repo_private(self, token: str, spec: RepoSpec) -> bool | None:
|
||||
base = self._api_base(spec.host)
|
||||
url = f"{base}/api/v1/repos/{spec.owner}/{spec.name}"
|
||||
try:
|
||||
resp = self._http.request_json("GET", url, headers=self._headers(token))
|
||||
except HttpError as exc:
|
||||
if exc.status == 404:
|
||||
return None
|
||||
raise
|
||||
|
||||
if not (200 <= resp.status < 300):
|
||||
return None
|
||||
data = resp.json or {}
|
||||
return bool(data.get("private", False))
|
||||
|
||||
def set_repo_private(self, token: str, spec: RepoSpec, *, private: bool) -> None:
|
||||
base = self._api_base(spec.host)
|
||||
url = f"{base}/api/v1/repos/{spec.owner}/{spec.name}"
|
||||
payload: Dict[str, Any] = {"private": bool(private)}
|
||||
|
||||
resp = self._http.request_json(
|
||||
"PATCH",
|
||||
url,
|
||||
headers=self._headers(token),
|
||||
payload=payload,
|
||||
)
|
||||
if not (200 <= resp.status < 300):
|
||||
raise HttpError(
|
||||
status=resp.status,
|
||||
message="Failed to update repository.",
|
||||
body=resp.text,
|
||||
)
|
||||
|
||||
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
|
||||
base = self._api_base(spec.host)
|
||||
|
||||
|
||||
@@ -54,6 +54,39 @@ class GitHubProvider(RemoteProvider):
|
||||
return False
|
||||
raise
|
||||
|
||||
def get_repo_private(self, token: str, spec: RepoSpec) -> bool | None:
|
||||
api = self._api_base(spec.host)
|
||||
url = f"{api}/repos/{spec.owner}/{spec.name}"
|
||||
try:
|
||||
resp = self._http.request_json("GET", url, headers=self._headers(token))
|
||||
except HttpError as exc:
|
||||
if exc.status == 404:
|
||||
return None
|
||||
raise
|
||||
|
||||
if not (200 <= resp.status < 300):
|
||||
return None
|
||||
data = resp.json or {}
|
||||
return bool(data.get("private", False))
|
||||
|
||||
def set_repo_private(self, token: str, spec: RepoSpec, *, private: bool) -> None:
|
||||
api = self._api_base(spec.host)
|
||||
url = f"{api}/repos/{spec.owner}/{spec.name}"
|
||||
payload: Dict[str, Any] = {"private": bool(private)}
|
||||
|
||||
resp = self._http.request_json(
|
||||
"PATCH",
|
||||
url,
|
||||
headers=self._headers(token),
|
||||
payload=payload,
|
||||
)
|
||||
if not (200 <= resp.status < 300):
|
||||
raise HttpError(
|
||||
status=resp.status,
|
||||
message="Failed to update repository.",
|
||||
body=resp.text,
|
||||
)
|
||||
|
||||
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
|
||||
api = self._api_base(spec.host)
|
||||
|
||||
|
||||
@@ -1,10 +1,17 @@
|
||||
# src/pkgmgr/core/remote_provisioning/types.py
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal, Optional
|
||||
|
||||
EnsureStatus = Literal["exists", "created", "skipped", "failed"]
|
||||
EnsureStatus = Literal[
|
||||
"exists",
|
||||
"created",
|
||||
"updated",
|
||||
"noop",
|
||||
"notfound",
|
||||
"skipped",
|
||||
"failed",
|
||||
]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
||||
118
src/pkgmgr/core/remote_provisioning/visibility.py
Normal file
118
src/pkgmgr/core/remote_provisioning/visibility.py
Normal file
@@ -0,0 +1,118 @@
|
||||
# src/pkgmgr/core/remote_provisioning/visibility.py
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from pkgmgr.core.credentials.resolver import ResolutionOptions, TokenResolver
|
||||
|
||||
from .http.errors import HttpError
|
||||
from .registry import ProviderRegistry
|
||||
from .types import (
|
||||
AuthError,
|
||||
EnsureResult,
|
||||
NetworkError,
|
||||
PermissionError,
|
||||
ProviderHint,
|
||||
RepoSpec,
|
||||
UnsupportedProviderError,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class VisibilityOptions:
|
||||
"""Options controlling remote visibility updates."""
|
||||
|
||||
preview: bool = False
|
||||
interactive: bool = True
|
||||
allow_prompt: bool = True
|
||||
save_prompt_token_to_keyring: bool = True
|
||||
|
||||
|
||||
def _raise_mapped_http_error(exc: HttpError, host: str) -> None:
|
||||
"""Map HttpError into domain-specific error types."""
|
||||
if exc.status == 0:
|
||||
raise NetworkError(f"Network error while talking to {host}: {exc}") from exc
|
||||
if exc.status == 401:
|
||||
raise AuthError(f"Authentication failed for {host} (401).") from exc
|
||||
if exc.status == 403:
|
||||
raise PermissionError(f"Permission denied for {host} (403).") from exc
|
||||
|
||||
raise NetworkError(
|
||||
f"HTTP error from {host}: status={exc.status}, message={exc}, body={exc.body}"
|
||||
) from exc
|
||||
|
||||
|
||||
def set_repo_visibility(
|
||||
spec: RepoSpec,
|
||||
*,
|
||||
private: bool,
|
||||
provider_hint: Optional[ProviderHint] = None,
|
||||
options: Optional[VisibilityOptions] = None,
|
||||
registry: Optional[ProviderRegistry] = None,
|
||||
token_resolver: Optional[TokenResolver] = None,
|
||||
) -> EnsureResult:
|
||||
"""
|
||||
Set repository visibility (public/private) WITHOUT creating repositories.
|
||||
|
||||
Behavior:
|
||||
- If repo does not exist -> status=notfound
|
||||
- If already desired -> status=noop
|
||||
- If changed -> status=updated
|
||||
- Respects preview mode -> status=skipped
|
||||
- Maps HTTP errors to domain-specific errors
|
||||
"""
|
||||
opts = options or VisibilityOptions()
|
||||
reg = registry or ProviderRegistry.default()
|
||||
resolver = token_resolver or TokenResolver()
|
||||
|
||||
provider = reg.resolve(spec.host)
|
||||
if provider_hint and provider_hint.kind:
|
||||
forced = provider_hint.kind.strip().lower()
|
||||
forced_provider = next(
|
||||
(p for p in reg.providers if getattr(p, "kind", "").lower() == forced),
|
||||
None,
|
||||
)
|
||||
if forced_provider is not None:
|
||||
provider = forced_provider
|
||||
|
||||
if provider is None:
|
||||
raise UnsupportedProviderError(f"No provider matched host: {spec.host}")
|
||||
|
||||
token_opts = ResolutionOptions(
|
||||
interactive=opts.interactive,
|
||||
allow_prompt=opts.allow_prompt,
|
||||
save_prompt_token_to_keyring=opts.save_prompt_token_to_keyring,
|
||||
)
|
||||
token = resolver.get_token(
|
||||
provider_kind=getattr(provider, "kind", "unknown"),
|
||||
host=spec.host,
|
||||
owner=spec.owner,
|
||||
options=token_opts,
|
||||
)
|
||||
|
||||
if opts.preview:
|
||||
return EnsureResult(
|
||||
status="skipped",
|
||||
message="Preview mode: no remote changes performed.",
|
||||
)
|
||||
|
||||
try:
|
||||
current_private = provider.get_repo_private(token.token, spec)
|
||||
if current_private is None:
|
||||
return EnsureResult(status="notfound", message="Repository not found.")
|
||||
|
||||
if bool(current_private) == bool(private):
|
||||
return EnsureResult(
|
||||
status="noop",
|
||||
message=f"Repository already {'private' if private else 'public'}.",
|
||||
)
|
||||
|
||||
provider.set_repo_private(token.token, spec, private=private)
|
||||
return EnsureResult(
|
||||
status="updated",
|
||||
message=f"Visibility updated to {'private' if private else 'public'}.",
|
||||
)
|
||||
except HttpError as exc:
|
||||
_raise_mapped_http_error(exc, host=spec.host)
|
||||
return EnsureResult(status="failed", message="Unreachable error mapping.")
|
||||
127
tests/e2e/test_mirror_visibility_smoke.py
Normal file
127
tests/e2e/test_mirror_visibility_smoke.py
Normal file
@@ -0,0 +1,127 @@
|
||||
# tests/e2e/test_mirror_visibility_smoke.py
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class TestMirrorVisibilityE2ESmoke(unittest.TestCase):
|
||||
"""
|
||||
E2E smoke tests for the new mirror visibility feature.
|
||||
|
||||
We intentionally DO NOT execute provider APIs or require tokens.
|
||||
The tests only verify that:
|
||||
- CLI exposes the new subcommands / flags via --help
|
||||
- Python public API surface is wired and importable
|
||||
|
||||
IMPORTANT:
|
||||
- `python -m pkgmgr.cli` is NOT valid unless pkgmgr/cli/__main__.py exists.
|
||||
- In this repo, `from pkgmgr.cli import main` is the stable entrypoint.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _project_root() -> Path:
|
||||
# tests/e2e/... -> project root is parents[2]
|
||||
return Path(__file__).resolve().parents[2]
|
||||
|
||||
def _run(self, args: list[str]) -> subprocess.CompletedProcess[str]:
|
||||
env = os.environ.copy()
|
||||
env.setdefault("PYTHONUNBUFFERED", "1")
|
||||
|
||||
return subprocess.run(
|
||||
args,
|
||||
cwd=str(self._project_root()),
|
||||
env=env,
|
||||
text=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
check=False,
|
||||
)
|
||||
|
||||
def _run_pkgmgr(self, pkgmgr_args: list[str]) -> subprocess.CompletedProcess[str]:
|
||||
"""
|
||||
Run the pkgmgr CLI in a way that works both:
|
||||
- when the console script `pkgmgr` is available on PATH
|
||||
- when only source imports are available
|
||||
|
||||
We prefer the console script if present because it's closest to real E2E.
|
||||
"""
|
||||
exe = shutil.which("pkgmgr")
|
||||
if exe:
|
||||
return self._run([exe, *pkgmgr_args])
|
||||
|
||||
# Fallback to a Python-level entrypoint that exists in your repo:
|
||||
# The stacktrace showed: from pkgmgr.cli import main
|
||||
# We call it with argv simulation.
|
||||
code = r"""
|
||||
import sys
|
||||
from pkgmgr.cli import main
|
||||
|
||||
sys.argv = ["pkgmgr"] + sys.argv[1:]
|
||||
main()
|
||||
"""
|
||||
return self._run([sys.executable, "-c", code, *pkgmgr_args])
|
||||
|
||||
def test_cli_help_lists_visibility_and_provision_public(self) -> None:
|
||||
# `pkgmgr mirror --help` should mention "visibility"
|
||||
p = self._run_pkgmgr(["mirror", "--help"])
|
||||
self.assertEqual(
|
||||
p.returncode,
|
||||
0,
|
||||
msg=f"Expected exit code 0, got {p.returncode}\n\nOutput:\n{p.stdout}",
|
||||
)
|
||||
out_lower = p.stdout.lower()
|
||||
self.assertIn("visibility", out_lower)
|
||||
self.assertIn("provision", out_lower)
|
||||
|
||||
# `pkgmgr mirror provision --help` should show `--public`
|
||||
p = self._run_pkgmgr(["mirror", "provision", "--help"])
|
||||
self.assertEqual(
|
||||
p.returncode,
|
||||
0,
|
||||
msg=f"Expected exit code 0, got {p.returncode}\n\nOutput:\n{p.stdout}",
|
||||
)
|
||||
self.assertIn("--public", p.stdout)
|
||||
|
||||
# `pkgmgr mirror visibility --help` should show choices {private, public}
|
||||
p = self._run_pkgmgr(["mirror", "visibility", "--help"])
|
||||
self.assertEqual(
|
||||
p.returncode,
|
||||
0,
|
||||
msg=f"Expected exit code 0, got {p.returncode}\n\nOutput:\n{p.stdout}",
|
||||
)
|
||||
out_lower = p.stdout.lower()
|
||||
self.assertIn("private", out_lower)
|
||||
self.assertIn("public", out_lower)
|
||||
|
||||
def test_python_api_surface_is_exposed(self) -> None:
|
||||
# Ensure public exports exist and setup_mirrors has ensure_visibility in signature.
|
||||
code = r"""
|
||||
import inspect
|
||||
|
||||
from pkgmgr.actions import mirror as mirror_actions
|
||||
from pkgmgr.core import remote_provisioning as rp
|
||||
|
||||
assert hasattr(mirror_actions, "set_mirror_visibility"), "set_mirror_visibility missing in pkgmgr.actions.mirror"
|
||||
assert hasattr(rp, "set_repo_visibility"), "set_repo_visibility missing in pkgmgr.core.remote_provisioning"
|
||||
|
||||
sig = inspect.signature(mirror_actions.setup_mirrors)
|
||||
assert "ensure_visibility" in sig.parameters, "setup_mirrors missing ensure_visibility parameter"
|
||||
|
||||
print("OK")
|
||||
"""
|
||||
p = self._run([sys.executable, "-c", code])
|
||||
self.assertEqual(
|
||||
p.returncode,
|
||||
0,
|
||||
msg=f"Expected exit code 0, got {p.returncode}\n\nOutput:\n{p.stdout}",
|
||||
)
|
||||
self.assertIn("OK", p.stdout)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
124
tests/integration/test_config_defaults_integration.py
Normal file
124
tests/integration/test_config_defaults_integration.py
Normal file
@@ -0,0 +1,124 @@
|
||||
# tests/integration/test_config_defaults_integration.py
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import types
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import yaml
|
||||
|
||||
from pkgmgr.core.config.load import load_config
|
||||
from pkgmgr.cli.commands import config as config_cmd
|
||||
|
||||
|
||||
class ConfigDefaultsIntegrationTest(unittest.TestCase):
|
||||
def test_defaults_yaml_is_loaded_and_can_be_copied_to_user_config_dir(self):
|
||||
"""
|
||||
Integration test:
|
||||
- Create a temp "site-packages/pkgmgr" fake install root
|
||||
- Put defaults under "<project_root>/config/defaults.yaml"
|
||||
where project_root == pkg_root.parent (as per your current logic)
|
||||
- Verify:
|
||||
A) load_config() picks up defaults from that config folder when user dir has no defaults
|
||||
B) _update_default_configs() copies defaults.yaml into ~/.config/pkgmgr/
|
||||
"""
|
||||
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
|
||||
# Fake HOME for user config
|
||||
home = root / "home"
|
||||
user_cfg_dir = home / ".config" / "pkgmgr"
|
||||
user_cfg_dir.mkdir(parents=True)
|
||||
user_config_path = str(user_cfg_dir / "config.yaml")
|
||||
|
||||
# Create a user config file that should NOT be overwritten by update
|
||||
(user_cfg_dir / "config.yaml").write_text(
|
||||
yaml.safe_dump({"directories": {"user_only": "/home/user"}}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
# Fake pkg install layout:
|
||||
# pkg_root = <root>/site-packages/pkgmgr
|
||||
# project_root = pkg_root.parent = <root>/site-packages
|
||||
site_packages = root / "site-packages"
|
||||
pkg_root = site_packages / "pkgmgr"
|
||||
pkg_root.mkdir(parents=True)
|
||||
|
||||
# This is the "project_root/config" candidate for both:
|
||||
# - load.py: roots include pkg_root.parent -> site-packages, so it checks site-packages/config
|
||||
# - cli/config.py: project_root == pkg_root.parent -> site-packages, so it checks site-packages/config
|
||||
config_dir = site_packages / "config"
|
||||
config_dir.mkdir(parents=True)
|
||||
|
||||
defaults_payload = {
|
||||
"directories": {
|
||||
"repositories": "/opt/Repositories",
|
||||
"binaries": "/usr/local/bin",
|
||||
},
|
||||
"repositories": [
|
||||
{"provider": "github", "account": "acme", "repository": "demo"}
|
||||
],
|
||||
}
|
||||
(config_dir / "defaults.yaml").write_text(
|
||||
yaml.safe_dump(defaults_payload),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
# Provide fake pkgmgr module so your functions resolve pkg_root correctly
|
||||
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
|
||||
|
||||
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
|
||||
with patch.dict(os.environ, {"HOME": str(home)}):
|
||||
# A) load_config should fall back to site-packages/config/defaults.yaml
|
||||
merged = load_config(user_config_path)
|
||||
|
||||
self.assertEqual(
|
||||
merged["directories"]["repositories"], "/opt/Repositories"
|
||||
)
|
||||
self.assertEqual(
|
||||
merged["directories"]["binaries"], "/usr/local/bin"
|
||||
)
|
||||
|
||||
# user-only key must still exist (user config merges over defaults)
|
||||
self.assertEqual(merged["directories"]["user_only"], "/home/user")
|
||||
|
||||
self.assertIn("repositories", merged)
|
||||
self.assertTrue(
|
||||
any(
|
||||
r.get("provider") == "github"
|
||||
and r.get("account") == "acme"
|
||||
and r.get("repository") == "demo"
|
||||
for r in merged["repositories"]
|
||||
)
|
||||
)
|
||||
|
||||
# B) update_default_configs should copy defaults.yaml to ~/.config/pkgmgr/
|
||||
# (and should not overwrite config.yaml)
|
||||
before_config_yaml = (user_cfg_dir / "config.yaml").read_text(
|
||||
encoding="utf-8"
|
||||
)
|
||||
|
||||
config_cmd._update_default_configs(user_config_path)
|
||||
|
||||
self.assertTrue((user_cfg_dir / "defaults.yaml").is_file())
|
||||
copied_defaults = yaml.safe_load(
|
||||
(user_cfg_dir / "defaults.yaml").read_text(encoding="utf-8")
|
||||
)
|
||||
self.assertEqual(
|
||||
copied_defaults["directories"]["repositories"],
|
||||
"/opt/Repositories",
|
||||
)
|
||||
|
||||
after_config_yaml = (user_cfg_dir / "config.yaml").read_text(
|
||||
encoding="utf-8"
|
||||
)
|
||||
self.assertEqual(after_config_yaml, before_config_yaml)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -113,17 +113,12 @@ class TestIntegrationMirrorCommands(unittest.TestCase):
|
||||
)
|
||||
)
|
||||
|
||||
# Deterministic remote probing (new refactor: probe_remote_reachable)
|
||||
# Deterministic remote probing (refactor: probe_remote_reachable_detail)
|
||||
# Patch where it is USED (setup_cmd imported it directly).
|
||||
stack.enter_context(
|
||||
_p(
|
||||
"pkgmgr.core.git.queries.probe_remote_reachable",
|
||||
return_value=True,
|
||||
)
|
||||
)
|
||||
stack.enter_context(
|
||||
_p(
|
||||
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable",
|
||||
return_value=True,
|
||||
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail",
|
||||
return_value=(True, ""),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
220
tests/integration/test_mirror_probe_detail_and_provision.py
Normal file
220
tests/integration/test_mirror_probe_detail_and_provision.py
Normal file
@@ -0,0 +1,220 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Integration test for mirror probing + provisioning after refactor.
|
||||
|
||||
We test the CLI entrypoint `handle_mirror_command()` directly to avoid
|
||||
depending on repo-selection / config parsing for `--all`.
|
||||
|
||||
Covers:
|
||||
- setup_cmd uses probe_remote_reachable_detail()
|
||||
- check prints [OK]/[WARN] and 'reason:' lines for failures
|
||||
- provision triggers ensure_remote_repo (preview-safe) for each git mirror
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import tempfile
|
||||
import unittest
|
||||
from contextlib import redirect_stderr, redirect_stdout
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, PropertyMock, patch
|
||||
|
||||
from pkgmgr.cli.commands.mirror import handle_mirror_command
|
||||
|
||||
|
||||
class TestIntegrationMirrorProbeDetailAndProvision(unittest.TestCase):
|
||||
def _make_ctx(
|
||||
self, *, repositories_base_dir: str, all_repositories: list[dict]
|
||||
) -> MagicMock:
|
||||
ctx = MagicMock()
|
||||
ctx.repositories_base_dir = repositories_base_dir
|
||||
ctx.all_repositories = all_repositories
|
||||
# mirror merge may look at this; keep it present for safety
|
||||
ctx.user_config_path = str(Path(repositories_base_dir) / "user.yml")
|
||||
return ctx
|
||||
|
||||
def _make_dummy_repo_ctx(self, *, repo_dir: str) -> MagicMock:
|
||||
"""
|
||||
This is the RepoMirrorContext-like object returned by build_context().
|
||||
"""
|
||||
dummy = MagicMock()
|
||||
dummy.identifier = "dummy-repo"
|
||||
dummy.repo_dir = repo_dir
|
||||
dummy.config_mirrors = {"origin": "git@github.com:alice/repo.git"}
|
||||
dummy.file_mirrors = {"backup": "ssh://git@git.example:2201/alice/repo.git"}
|
||||
type(dummy).resolved_mirrors = PropertyMock(
|
||||
return_value={
|
||||
"origin": "git@github.com:alice/repo.git",
|
||||
"backup": "ssh://git@git.example:2201/alice/repo.git",
|
||||
}
|
||||
)
|
||||
return dummy
|
||||
|
||||
def _run_handle(
|
||||
self,
|
||||
*,
|
||||
subcommand: str,
|
||||
preview: bool,
|
||||
selected: list[dict],
|
||||
dummy_repo_dir: str,
|
||||
probe_detail_side_effect,
|
||||
) -> str:
|
||||
"""
|
||||
Run handle_mirror_command() with patched side effects and capture output.
|
||||
"""
|
||||
args = SimpleNamespace(subcommand=subcommand, preview=preview)
|
||||
|
||||
# Fake ensure_remote_repo result (preview safe)
|
||||
def _fake_ensure_remote_repo(spec, provider_hint=None, options=None):
|
||||
if options is not None and getattr(options, "preview", False) is not True:
|
||||
raise AssertionError(
|
||||
"ensure_remote_repo called without preview=True (should never happen in tests)."
|
||||
)
|
||||
r = MagicMock()
|
||||
r.status = "preview"
|
||||
r.message = "Preview mode: no remote provisioning performed."
|
||||
r.url = None
|
||||
return r
|
||||
|
||||
buf = io.StringIO()
|
||||
ctx = self._make_ctx(
|
||||
repositories_base_dir=str(Path(dummy_repo_dir).parent),
|
||||
all_repositories=selected,
|
||||
)
|
||||
dummy_repo_ctx = self._make_dummy_repo_ctx(repo_dir=dummy_repo_dir)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"pkgmgr.actions.mirror.setup_cmd.build_context",
|
||||
return_value=dummy_repo_ctx,
|
||||
),
|
||||
patch(
|
||||
"pkgmgr.actions.mirror.setup_cmd.ensure_origin_remote",
|
||||
return_value=None,
|
||||
),
|
||||
patch(
|
||||
"pkgmgr.actions.mirror.git_remote.ensure_origin_remote",
|
||||
return_value=None,
|
||||
),
|
||||
patch(
|
||||
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail",
|
||||
side_effect=probe_detail_side_effect,
|
||||
),
|
||||
patch(
|
||||
"pkgmgr.actions.mirror.remote_provision.ensure_remote_repo",
|
||||
side_effect=_fake_ensure_remote_repo,
|
||||
),
|
||||
redirect_stdout(buf),
|
||||
redirect_stderr(buf),
|
||||
):
|
||||
handle_mirror_command(ctx, args, selected)
|
||||
|
||||
return buf.getvalue()
|
||||
|
||||
def test_mirror_check_preview_prints_warn_reason(self) -> None:
|
||||
"""
|
||||
'mirror check --preview' should:
|
||||
- probe both git mirrors
|
||||
- print [OK] for origin
|
||||
- print [WARN] for backup + reason line
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
tmp_path = Path(tmp)
|
||||
repo_dir = tmp_path / "dummy-repo"
|
||||
repo_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
selected = [
|
||||
{"provider": "github.com", "account": "alice", "repository": "repo"}
|
||||
]
|
||||
|
||||
def probe_side_effect(url: str, cwd: str = "."):
|
||||
if "github.com" in url:
|
||||
# show "empty repo reachable" note; setup_cmd prints [OK] and does not print reason for ok
|
||||
return (
|
||||
True,
|
||||
"remote reachable, but no refs found yet (empty repository)",
|
||||
)
|
||||
return False, "(exit 128) fatal: Could not read from remote repository."
|
||||
|
||||
out = self._run_handle(
|
||||
subcommand="check",
|
||||
preview=True,
|
||||
selected=selected,
|
||||
dummy_repo_dir=str(repo_dir),
|
||||
probe_detail_side_effect=probe_side_effect,
|
||||
)
|
||||
|
||||
self.assertIn("[MIRROR SETUP:REMOTE]", out)
|
||||
|
||||
# origin OK (even with a note returned; still OK)
|
||||
self.assertIn("[OK] origin: git@github.com:alice/repo.git", out)
|
||||
|
||||
# backup WARN prints reason line
|
||||
self.assertIn(
|
||||
"[WARN] backup: ssh://git@git.example:2201/alice/repo.git", out
|
||||
)
|
||||
self.assertIn("reason:", out)
|
||||
self.assertIn("Could not read from remote repository", out)
|
||||
|
||||
def test_mirror_provision_preview_provisions_each_git_mirror(self) -> None:
|
||||
"""
|
||||
'mirror provision --preview' should:
|
||||
- print provisioning lines for each git mirror
|
||||
- still probe and print [OK]/[WARN]
|
||||
- call ensure_remote_repo only in preview mode (enforced by fake)
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
tmp_path = Path(tmp)
|
||||
repo_dir = tmp_path / "dummy-repo"
|
||||
repo_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
selected = [
|
||||
{
|
||||
"provider": "github.com",
|
||||
"account": "alice",
|
||||
"repository": "repo",
|
||||
"private": True,
|
||||
"description": "desc",
|
||||
}
|
||||
]
|
||||
|
||||
def probe_side_effect(url: str, cwd: str = "."):
|
||||
if "github.com" in url:
|
||||
return True, ""
|
||||
return False, "(exit 128) fatal: Could not read from remote repository."
|
||||
|
||||
out = self._run_handle(
|
||||
subcommand="provision",
|
||||
preview=True,
|
||||
selected=selected,
|
||||
dummy_repo_dir=str(repo_dir),
|
||||
probe_detail_side_effect=probe_side_effect,
|
||||
)
|
||||
|
||||
# provisioning should attempt BOTH mirrors
|
||||
self.assertIn(
|
||||
"[REMOTE ENSURE] ensuring mirror 'origin': git@github.com:alice/repo.git",
|
||||
out,
|
||||
)
|
||||
self.assertIn(
|
||||
"[REMOTE ENSURE] ensuring mirror 'backup': ssh://git@git.example:2201/alice/repo.git",
|
||||
out,
|
||||
)
|
||||
|
||||
# patched ensure_remote_repo prints PREVIEW status via remote_provision
|
||||
self.assertIn("[REMOTE ENSURE]", out)
|
||||
self.assertIn("PREVIEW", out.upper())
|
||||
|
||||
# probes after provisioning
|
||||
self.assertIn("[OK] origin: git@github.com:alice/repo.git", out)
|
||||
self.assertIn(
|
||||
"[WARN] backup: ssh://git@git.example:2201/alice/repo.git", out
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -40,8 +40,8 @@ class TestCreateRepoPypiNotInGitConfig(unittest.TestCase):
|
||||
with (
|
||||
# Avoid any real network calls during mirror "remote probing"
|
||||
patch(
|
||||
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable",
|
||||
return_value=True,
|
||||
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail",
|
||||
return_value=(True, ""),
|
||||
),
|
||||
# Force templates to come from our temp directory
|
||||
patch(
|
||||
|
||||
322
tests/integration/test_visibility_integration.py
Normal file
322
tests/integration/test_visibility_integration.py
Normal file
@@ -0,0 +1,322 @@
|
||||
# tests/integration/test_visibility_integration.py
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import os
|
||||
import tempfile
|
||||
import types
|
||||
import unittest
|
||||
from contextlib import redirect_stdout
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.actions.mirror.setup_cmd import setup_mirrors
|
||||
from pkgmgr.actions.mirror.visibility_cmd import set_mirror_visibility
|
||||
from pkgmgr.core.remote_provisioning.types import RepoSpec
|
||||
|
||||
|
||||
Repository = Dict[str, Any]
|
||||
|
||||
|
||||
class _FakeRegistry:
|
||||
"""
|
||||
Minimal ProviderRegistry-like object for tests.
|
||||
|
||||
- has .providers for provider-hint selection
|
||||
- has .resolve(host) to pick a provider
|
||||
"""
|
||||
|
||||
def __init__(self, provider: Any) -> None:
|
||||
self.providers = [provider]
|
||||
self._provider = provider
|
||||
|
||||
def resolve(self, host: str) -> Any:
|
||||
return self._provider
|
||||
|
||||
|
||||
class FakeProvider:
|
||||
"""
|
||||
Fake remote provider implementing the visibility API surface.
|
||||
|
||||
Key feature: tolerant host matching, because normalize_provider_host()/URL parsing
|
||||
may drop ports or schemes.
|
||||
"""
|
||||
|
||||
kind = "gitea"
|
||||
|
||||
def __init__(self) -> None:
|
||||
# maps (host, owner, name) -> private(bool)
|
||||
self.privacy: Dict[Tuple[str, str, str], bool] = {}
|
||||
self.calls: List[Tuple[str, Any]] = []
|
||||
|
||||
def can_handle(self, host: str) -> bool:
|
||||
return True
|
||||
|
||||
def _candidate_hosts(self, host: str) -> List[str]:
|
||||
"""
|
||||
Be tolerant against host normalization differences:
|
||||
- may contain scheme (https://...)
|
||||
- may contain port (host:2201)
|
||||
"""
|
||||
h = (host or "").strip()
|
||||
if not h:
|
||||
return [h]
|
||||
|
||||
candidates = [h]
|
||||
|
||||
# strip scheme if present
|
||||
if h.startswith("http://"):
|
||||
candidates.append(h[len("http://") :])
|
||||
if h.startswith("https://"):
|
||||
candidates.append(h[len("https://") :])
|
||||
|
||||
# strip port if present (host:port)
|
||||
for c in list(candidates):
|
||||
if ":" in c:
|
||||
candidates.append(c.split(":", 1)[0])
|
||||
|
||||
# de-dup
|
||||
out: List[str] = []
|
||||
for c in candidates:
|
||||
if c not in out:
|
||||
out.append(c)
|
||||
return out
|
||||
|
||||
def repo_exists(self, token: str, spec: RepoSpec) -> bool:
|
||||
self.calls.append(("repo_exists", (token, spec)))
|
||||
for h in self._candidate_hosts(spec.host):
|
||||
if (h, spec.owner, spec.name) in self.privacy:
|
||||
return True
|
||||
return False
|
||||
|
||||
def create_repo(self, token: str, spec: RepoSpec):
|
||||
self.calls.append(("create_repo", (token, spec)))
|
||||
# store under the provided host (as-is)
|
||||
self.privacy[(spec.host, spec.owner, spec.name)] = bool(spec.private)
|
||||
return types.SimpleNamespace(status="created", message="created", url=None)
|
||||
|
||||
def get_repo_private(self, token: str, spec: RepoSpec) -> Optional[bool]:
|
||||
self.calls.append(("get_repo_private", (token, spec)))
|
||||
for h in self._candidate_hosts(spec.host):
|
||||
key = (h, spec.owner, spec.name)
|
||||
if key in self.privacy:
|
||||
return self.privacy[key]
|
||||
return None
|
||||
|
||||
def set_repo_private(self, token: str, spec: RepoSpec, *, private: bool) -> None:
|
||||
self.calls.append(("set_repo_private", (token, spec, private)))
|
||||
# update whichever key exists; else create on spec.host
|
||||
for h in self._candidate_hosts(spec.host):
|
||||
key = (h, spec.owner, spec.name)
|
||||
if key in self.privacy:
|
||||
self.privacy[key] = bool(private)
|
||||
return
|
||||
self.privacy[(spec.host, spec.owner, spec.name)] = bool(private)
|
||||
|
||||
|
||||
def _mk_ctx(*, identifier: str, repo_dir: str, mirrors: Dict[str, str]) -> Any:
|
||||
return types.SimpleNamespace(
|
||||
identifier=identifier,
|
||||
repo_dir=repo_dir,
|
||||
resolved_mirrors=mirrors,
|
||||
)
|
||||
|
||||
|
||||
class TestMirrorVisibilityIntegration(unittest.TestCase):
|
||||
"""
|
||||
Integration tests for:
|
||||
- pkgmgr.actions.mirror.visibility_cmd.set_mirror_visibility
|
||||
- pkgmgr.actions.mirror.setup_cmd.setup_mirrors (ensure_visibility semantics)
|
||||
"""
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.tmp = tempfile.TemporaryDirectory()
|
||||
self.addCleanup(self.tmp.cleanup)
|
||||
|
||||
def _repo_dir(self, name: str) -> str:
|
||||
d = os.path.join(self.tmp.name, name)
|
||||
os.makedirs(d, exist_ok=True)
|
||||
return d
|
||||
|
||||
@patch("pkgmgr.core.credentials.resolver.TokenResolver.get_token")
|
||||
@patch("pkgmgr.core.remote_provisioning.visibility.ProviderRegistry.default")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
|
||||
def test_mirror_visibility_applies_to_all_git_mirrors_updated_and_noop(
|
||||
self,
|
||||
m_build_context,
|
||||
m_registry_default,
|
||||
m_get_token,
|
||||
) -> None:
|
||||
"""
|
||||
Scenario:
|
||||
- repo has two git mirrors
|
||||
- one mirror needs update -> UPDATED
|
||||
- second mirror already desired -> NOOP
|
||||
"""
|
||||
provider = FakeProvider()
|
||||
registry = _FakeRegistry(provider)
|
||||
m_registry_default.return_value = registry
|
||||
|
||||
# Avoid interactive token prompt
|
||||
m_get_token.return_value = types.SimpleNamespace(token="test-token")
|
||||
|
||||
# Seed provider state:
|
||||
# - repo1 currently private=True
|
||||
# - We'll set visibility to public -> should UPDATE
|
||||
provider.privacy[("git.veen.world", "me", "repo1")] = True
|
||||
|
||||
repo = {"id": "repo1", "description": "Repo 1"}
|
||||
repo_dir = self._repo_dir("repo1")
|
||||
|
||||
m_build_context.return_value = _mk_ctx(
|
||||
identifier="repo1",
|
||||
repo_dir=repo_dir,
|
||||
mirrors={
|
||||
"origin": "ssh://git.veen.world:2201/me/repo1.git",
|
||||
"backup": "https://git.veen.world:2201/me/repo1.git",
|
||||
},
|
||||
)
|
||||
|
||||
buf = io.StringIO()
|
||||
with redirect_stdout(buf):
|
||||
set_mirror_visibility(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir=self.tmp.name,
|
||||
all_repos=[repo],
|
||||
visibility="public",
|
||||
preview=False,
|
||||
)
|
||||
out = buf.getvalue()
|
||||
|
||||
# We apply to BOTH git mirrors.
|
||||
self.assertIn("[MIRROR VISIBILITY] applying to mirror 'origin':", out)
|
||||
self.assertIn("[MIRROR VISIBILITY] applying to mirror 'backup':", out)
|
||||
|
||||
# After first update, second call will see it already public (NOOP).
|
||||
self.assertIn("[REMOTE VISIBILITY] UPDATED:", out)
|
||||
self.assertIn("[REMOTE VISIBILITY] NOOP:", out)
|
||||
|
||||
# Final state must be public (private=False)
|
||||
self.assertFalse(provider.privacy[("git.veen.world", "me", "repo1")])
|
||||
|
||||
@patch("pkgmgr.core.credentials.resolver.TokenResolver.get_token")
|
||||
@patch("pkgmgr.core.remote_provisioning.visibility.ProviderRegistry.default")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.determine_primary_remote_url")
|
||||
def test_mirror_visibility_fallback_to_primary_when_no_git_mirrors(
|
||||
self,
|
||||
m_determine_primary,
|
||||
m_build_context,
|
||||
m_registry_default,
|
||||
m_get_token,
|
||||
) -> None:
|
||||
"""
|
||||
Scenario:
|
||||
- no git mirrors in MIRRORS config
|
||||
- we fall back to primary URL and apply visibility there
|
||||
"""
|
||||
provider = FakeProvider()
|
||||
registry = _FakeRegistry(provider)
|
||||
m_registry_default.return_value = registry
|
||||
m_get_token.return_value = types.SimpleNamespace(token="test-token")
|
||||
|
||||
# Seed state: currently public (private=False), target private -> UPDATED
|
||||
provider.privacy[("git.veen.world", "me", "repo2")] = False
|
||||
|
||||
repo = {"id": "repo2", "description": "Repo 2"}
|
||||
repo_dir = self._repo_dir("repo2")
|
||||
|
||||
m_build_context.return_value = _mk_ctx(
|
||||
identifier="repo2",
|
||||
repo_dir=repo_dir,
|
||||
mirrors={
|
||||
# non-git mirror entries
|
||||
"pypi": "https://pypi.org/project/example/",
|
||||
},
|
||||
)
|
||||
m_determine_primary.return_value = "ssh://git.veen.world:2201/me/repo2.git"
|
||||
|
||||
buf = io.StringIO()
|
||||
with redirect_stdout(buf):
|
||||
set_mirror_visibility(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir=self.tmp.name,
|
||||
all_repos=[repo],
|
||||
visibility="private",
|
||||
preview=False,
|
||||
)
|
||||
out = buf.getvalue()
|
||||
|
||||
self.assertIn("[MIRROR VISIBILITY] applying to primary:", out)
|
||||
self.assertIn("[REMOTE VISIBILITY] UPDATED:", out)
|
||||
self.assertTrue(provider.privacy[("git.veen.world", "me", "repo2")])
|
||||
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_remote_repository_for_url")
|
||||
@patch("pkgmgr.core.credentials.resolver.TokenResolver.get_token")
|
||||
@patch("pkgmgr.core.remote_provisioning.visibility.ProviderRegistry.default")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
|
||||
def test_setup_mirrors_provision_public_enforces_visibility_and_private_default(
|
||||
self,
|
||||
m_build_context,
|
||||
m_registry_default,
|
||||
m_get_token,
|
||||
m_ensure_remote_for_url,
|
||||
m_probe,
|
||||
) -> None:
|
||||
"""
|
||||
Covers the "mirror provision --public" semantics:
|
||||
- setup_mirrors(remote=True, ensure_remote=True, ensure_visibility="public")
|
||||
- ensure_remote_repository_for_url is called with private_default=False
|
||||
- then set_repo_visibility is applied (UPDATED/NOOP depending on current state)
|
||||
- git probing is mocked (no subprocess)
|
||||
"""
|
||||
provider = FakeProvider()
|
||||
registry = _FakeRegistry(provider)
|
||||
m_registry_default.return_value = registry
|
||||
m_get_token.return_value = types.SimpleNamespace(token="test-token")
|
||||
|
||||
# Make git probing always OK (no subprocess calls)
|
||||
m_probe.return_value = (True, "")
|
||||
|
||||
# Seed provider: repo4 currently private=True, target public -> UPDATED
|
||||
provider.privacy[("git.veen.world", "me", "repo4")] = True
|
||||
|
||||
repo = {"id": "repo4", "description": "Repo 4", "private": True}
|
||||
repo_dir = self._repo_dir("repo4")
|
||||
|
||||
m_build_context.return_value = _mk_ctx(
|
||||
identifier="repo4",
|
||||
repo_dir=repo_dir,
|
||||
mirrors={
|
||||
"origin": "ssh://git.veen.world:2201/me/repo4.git",
|
||||
},
|
||||
)
|
||||
|
||||
buf = io.StringIO()
|
||||
with redirect_stdout(buf):
|
||||
setup_mirrors(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir=self.tmp.name,
|
||||
all_repos=[repo],
|
||||
preview=False,
|
||||
local=False,
|
||||
remote=True,
|
||||
ensure_remote=True,
|
||||
ensure_visibility="public",
|
||||
)
|
||||
out = buf.getvalue()
|
||||
|
||||
# ensure_remote_repository_for_url called and private_default overridden to False
|
||||
self.assertTrue(m_ensure_remote_for_url.called)
|
||||
_, kwargs = m_ensure_remote_for_url.call_args
|
||||
self.assertIn("private_default", kwargs)
|
||||
self.assertFalse(kwargs["private_default"])
|
||||
|
||||
# Visibility should be enforced
|
||||
self.assertIn("[REMOTE VISIBILITY] UPDATED:", out)
|
||||
self.assertFalse(provider.privacy[("git.veen.world", "me", "repo4")])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -38,7 +38,6 @@ class TestMirrorSetupCmd(unittest.TestCase):
|
||||
ensure_remote=False,
|
||||
)
|
||||
|
||||
# ensure_origin_remote(repo, ctx, preview) is called positionally in your code
|
||||
m_ensure.assert_called_once()
|
||||
args, kwargs = m_ensure.call_args
|
||||
|
||||
@@ -50,13 +49,13 @@ class TestMirrorSetupCmd(unittest.TestCase):
|
||||
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.determine_primary_remote_url")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail")
|
||||
def test_setup_mirrors_remote_no_mirrors_probes_primary(
|
||||
self, m_probe, m_primary, m_ctx
|
||||
self, m_probe_detail, m_primary, m_ctx
|
||||
) -> None:
|
||||
m_ctx.return_value = self._ctx(repo_dir="/tmp/repo", resolved={})
|
||||
m_primary.return_value = "git@github.com:alice/repo.git"
|
||||
m_probe.return_value = True
|
||||
m_probe_detail.return_value = (True, "")
|
||||
|
||||
repos = [{"provider": "github.com", "account": "alice", "repository": "repo"}]
|
||||
setup_mirrors(
|
||||
@@ -70,14 +69,14 @@ class TestMirrorSetupCmd(unittest.TestCase):
|
||||
)
|
||||
|
||||
m_primary.assert_called()
|
||||
m_probe.assert_called_once_with(
|
||||
m_probe_detail.assert_called_once_with(
|
||||
"git@github.com:alice/repo.git", cwd="/tmp/repo"
|
||||
)
|
||||
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail")
|
||||
def test_setup_mirrors_remote_with_mirrors_probes_each(
|
||||
self, m_probe, m_ctx
|
||||
self, m_probe_detail, m_ctx
|
||||
) -> None:
|
||||
m_ctx.return_value = self._ctx(
|
||||
repo_dir="/tmp/repo",
|
||||
@@ -86,7 +85,7 @@ class TestMirrorSetupCmd(unittest.TestCase):
|
||||
"backup": "ssh://git@git.veen.world:2201/alice/repo.git",
|
||||
},
|
||||
)
|
||||
m_probe.return_value = True
|
||||
m_probe_detail.return_value = (True, "")
|
||||
|
||||
repos = [{"provider": "github.com", "account": "alice", "repository": "repo"}]
|
||||
setup_mirrors(
|
||||
@@ -99,12 +98,105 @@ class TestMirrorSetupCmd(unittest.TestCase):
|
||||
ensure_remote=False,
|
||||
)
|
||||
|
||||
self.assertEqual(m_probe.call_count, 2)
|
||||
m_probe.assert_any_call("git@github.com:alice/repo.git", cwd="/tmp/repo")
|
||||
m_probe.assert_any_call(
|
||||
# Should probe BOTH git mirror URLs
|
||||
self.assertEqual(m_probe_detail.call_count, 2)
|
||||
m_probe_detail.assert_any_call("git@github.com:alice/repo.git", cwd="/tmp/repo")
|
||||
m_probe_detail.assert_any_call(
|
||||
"ssh://git@git.veen.world:2201/alice/repo.git", cwd="/tmp/repo"
|
||||
)
|
||||
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_remote_repository_for_url")
|
||||
def test_setup_mirrors_remote_with_mirrors_ensure_remote_provisions_each(
|
||||
self, m_ensure_url, m_probe_detail, m_ctx
|
||||
) -> None:
|
||||
m_ctx.return_value = self._ctx(
|
||||
repo_dir="/tmp/repo",
|
||||
resolved={
|
||||
"origin": "git@github.com:alice/repo.git",
|
||||
"backup": "ssh://git@git.veen.world:2201/alice/repo.git",
|
||||
},
|
||||
)
|
||||
m_probe_detail.return_value = (True, "")
|
||||
|
||||
repos = [
|
||||
{
|
||||
"provider": "github.com",
|
||||
"account": "alice",
|
||||
"repository": "repo",
|
||||
"private": True,
|
||||
"description": "desc",
|
||||
}
|
||||
]
|
||||
setup_mirrors(
|
||||
selected_repos=repos,
|
||||
repositories_base_dir="/tmp",
|
||||
all_repos=repos,
|
||||
preview=True,
|
||||
local=False,
|
||||
remote=True,
|
||||
ensure_remote=True,
|
||||
)
|
||||
|
||||
# Provision both mirrors
|
||||
self.assertEqual(m_ensure_url.call_count, 2)
|
||||
m_ensure_url.assert_any_call(
|
||||
url="git@github.com:alice/repo.git",
|
||||
private_default=True,
|
||||
description="desc",
|
||||
preview=True,
|
||||
)
|
||||
m_ensure_url.assert_any_call(
|
||||
url="ssh://git@git.veen.world:2201/alice/repo.git",
|
||||
private_default=True,
|
||||
description="desc",
|
||||
preview=True,
|
||||
)
|
||||
|
||||
# Still probes both
|
||||
self.assertEqual(m_probe_detail.call_count, 2)
|
||||
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.determine_primary_remote_url")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_remote_repository_for_url")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail")
|
||||
def test_setup_mirrors_remote_no_mirrors_ensure_remote_provisions_primary(
|
||||
self, m_probe_detail, m_ensure_url, m_primary, m_ctx
|
||||
) -> None:
|
||||
m_ctx.return_value = self._ctx(repo_dir="/tmp/repo", resolved={})
|
||||
m_primary.return_value = "git@github.com:alice/repo.git"
|
||||
m_probe_detail.return_value = (True, "")
|
||||
|
||||
repos = [
|
||||
{
|
||||
"provider": "github.com",
|
||||
"account": "alice",
|
||||
"repository": "repo",
|
||||
"private": False,
|
||||
"description": "desc",
|
||||
}
|
||||
]
|
||||
setup_mirrors(
|
||||
selected_repos=repos,
|
||||
repositories_base_dir="/tmp",
|
||||
all_repos=repos,
|
||||
preview=True,
|
||||
local=False,
|
||||
remote=True,
|
||||
ensure_remote=True,
|
||||
)
|
||||
|
||||
m_ensure_url.assert_called_once_with(
|
||||
url="git@github.com:alice/repo.git",
|
||||
private_default=False,
|
||||
description="desc",
|
||||
preview=True,
|
||||
)
|
||||
m_probe_detail.assert_called_once_with(
|
||||
"git@github.com:alice/repo.git", cwd="/tmp/repo"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
177
tests/unit/pkgmgr/actions/mirror/test_visibility_cmd.py
Normal file
177
tests/unit/pkgmgr/actions/mirror/test_visibility_cmd.py
Normal file
@@ -0,0 +1,177 @@
|
||||
# tests/unit/pkgmgr/actions/mirror/test_visibility_cmd.py
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import unittest
|
||||
from contextlib import redirect_stdout
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from pkgmgr.actions.mirror.visibility_cmd import set_mirror_visibility
|
||||
|
||||
|
||||
class TestMirrorVisibilityCmd(unittest.TestCase):
|
||||
def test_invalid_visibility_raises_value_error(self) -> None:
|
||||
with self.assertRaises(ValueError):
|
||||
set_mirror_visibility(
|
||||
selected_repos=[{"id": "x"}],
|
||||
repositories_base_dir="/tmp",
|
||||
all_repos=[],
|
||||
visibility="nope",
|
||||
)
|
||||
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.determine_primary_remote_url")
|
||||
def test_no_git_mirrors_and_no_primary_prints_nothing_to_do(
|
||||
self,
|
||||
mock_determine_primary: MagicMock,
|
||||
mock_build_ctx: MagicMock,
|
||||
) -> None:
|
||||
ctx = MagicMock()
|
||||
ctx.identifier = "repo1"
|
||||
ctx.repo_dir = "/tmp/repo1"
|
||||
ctx.resolved_mirrors = {"pypi": "https://pypi.org/project/x/"} # non-git
|
||||
mock_build_ctx.return_value = ctx
|
||||
mock_determine_primary.return_value = None
|
||||
|
||||
buf = io.StringIO()
|
||||
with redirect_stdout(buf):
|
||||
set_mirror_visibility(
|
||||
selected_repos=[{"id": "repo1", "description": "desc"}],
|
||||
repositories_base_dir="/tmp",
|
||||
all_repos=[],
|
||||
visibility="public",
|
||||
preview=True,
|
||||
)
|
||||
|
||||
out = buf.getvalue()
|
||||
self.assertIn("[MIRROR VISIBILITY] repo1", out)
|
||||
self.assertIn("Nothing to do.", out)
|
||||
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.determine_primary_remote_url")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.normalize_provider_host")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.parse_repo_from_git_url")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.set_repo_visibility")
|
||||
def test_applies_to_primary_when_no_git_mirrors(
|
||||
self,
|
||||
mock_set_repo_visibility: MagicMock,
|
||||
mock_parse: MagicMock,
|
||||
mock_norm: MagicMock,
|
||||
mock_determine_primary: MagicMock,
|
||||
mock_build_ctx: MagicMock,
|
||||
) -> None:
|
||||
ctx = MagicMock()
|
||||
ctx.identifier = "repo1"
|
||||
ctx.repo_dir = "/tmp/repo1"
|
||||
ctx.resolved_mirrors = {} # no mirrors
|
||||
mock_build_ctx.return_value = ctx
|
||||
|
||||
primary = "ssh://git.veen.world:2201/me/repo1.git"
|
||||
mock_determine_primary.return_value = primary
|
||||
|
||||
mock_parse.return_value = ("git.veen.world:2201", "me", "repo1")
|
||||
mock_norm.return_value = "git.veen.world:2201"
|
||||
|
||||
mock_set_repo_visibility.return_value = MagicMock(
|
||||
status="skipped", message="Preview"
|
||||
)
|
||||
|
||||
buf = io.StringIO()
|
||||
with redirect_stdout(buf):
|
||||
set_mirror_visibility(
|
||||
selected_repos=[{"id": "repo1", "description": "desc"}],
|
||||
repositories_base_dir="/tmp",
|
||||
all_repos=[],
|
||||
visibility="private",
|
||||
preview=True,
|
||||
)
|
||||
|
||||
mock_set_repo_visibility.assert_called_once()
|
||||
_, kwargs = mock_set_repo_visibility.call_args
|
||||
self.assertEqual(
|
||||
kwargs["private"], True
|
||||
) # visibility=private => desired_private=True
|
||||
out = buf.getvalue()
|
||||
self.assertIn("applying to primary", out)
|
||||
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.normalize_provider_host")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.parse_repo_from_git_url")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.set_repo_visibility")
|
||||
def test_applies_to_all_git_mirrors(
|
||||
self,
|
||||
mock_set_repo_visibility: MagicMock,
|
||||
mock_parse: MagicMock,
|
||||
mock_norm: MagicMock,
|
||||
mock_build_ctx: MagicMock,
|
||||
) -> None:
|
||||
ctx = MagicMock()
|
||||
ctx.identifier = "repo1"
|
||||
ctx.repo_dir = "/tmp/repo1"
|
||||
ctx.resolved_mirrors = {
|
||||
"origin": "ssh://git.veen.world:2201/me/repo1.git",
|
||||
"backup": "git@git.veen.world:me/repo1.git",
|
||||
"notgit": "https://pypi.org/project/x/",
|
||||
}
|
||||
mock_build_ctx.return_value = ctx
|
||||
|
||||
# For both URLs, parsing returns same repo
|
||||
mock_parse.return_value = ("git.veen.world", "me", "repo1")
|
||||
mock_norm.return_value = "git.veen.world"
|
||||
|
||||
mock_set_repo_visibility.return_value = MagicMock(
|
||||
status="noop", message="Already public"
|
||||
)
|
||||
|
||||
buf = io.StringIO()
|
||||
with redirect_stdout(buf):
|
||||
set_mirror_visibility(
|
||||
selected_repos=[{"id": "repo1", "description": "desc"}],
|
||||
repositories_base_dir="/tmp",
|
||||
all_repos=[],
|
||||
visibility="public",
|
||||
preview=False,
|
||||
)
|
||||
|
||||
# Should be called for origin + backup (2), but not for notgit
|
||||
self.assertEqual(mock_set_repo_visibility.call_count, 2)
|
||||
|
||||
# Each call should request desired private=False for "public"
|
||||
for call in mock_set_repo_visibility.call_args_list:
|
||||
_, kwargs = call
|
||||
self.assertEqual(kwargs["private"], False)
|
||||
|
||||
out = buf.getvalue()
|
||||
self.assertIn("applying to mirror 'origin'", out)
|
||||
self.assertIn("applying to mirror 'backup'", out)
|
||||
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.visibility_cmd.determine_primary_remote_url")
|
||||
def test_primary_not_git_prints_nothing_to_do(
|
||||
self,
|
||||
mock_determine_primary: MagicMock,
|
||||
mock_build_ctx: MagicMock,
|
||||
) -> None:
|
||||
ctx = MagicMock()
|
||||
ctx.identifier = "repo1"
|
||||
ctx.repo_dir = "/tmp/repo1"
|
||||
ctx.resolved_mirrors = {}
|
||||
mock_build_ctx.return_value = ctx
|
||||
|
||||
mock_determine_primary.return_value = "https://example.com/not-a-git-url"
|
||||
|
||||
buf = io.StringIO()
|
||||
with redirect_stdout(buf):
|
||||
set_mirror_visibility(
|
||||
selected_repos=[{"id": "repo1"}],
|
||||
repositories_base_dir="/tmp",
|
||||
all_repos=[],
|
||||
visibility="public",
|
||||
)
|
||||
|
||||
out = buf.getvalue()
|
||||
self.assertIn("Nothing to do.", out)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
135
tests/unit/pkgmgr/core/config/test_cli_update.py
Normal file
135
tests/unit/pkgmgr/core/config/test_cli_update.py
Normal file
@@ -0,0 +1,135 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import types
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.cli.commands import config as config_cmd
|
||||
|
||||
|
||||
class FindDefaultsSourceDirTests(unittest.TestCase):
|
||||
def test_prefers_pkg_root_config_over_project_root_config(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
pkg_root = root / "site-packages" / "pkgmgr"
|
||||
pkg_root.mkdir(parents=True)
|
||||
|
||||
# both exist
|
||||
(pkg_root / "config").mkdir(parents=True)
|
||||
(pkg_root.parent / "config").mkdir(parents=True)
|
||||
|
||||
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
|
||||
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
|
||||
found = config_cmd._find_defaults_source_dir()
|
||||
|
||||
self.assertEqual(Path(found).resolve(), (pkg_root / "config").resolve())
|
||||
|
||||
def test_falls_back_to_project_root_config(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
pkg_root = root / "site-packages" / "pkgmgr"
|
||||
pkg_root.mkdir(parents=True)
|
||||
|
||||
# only project_root config exists
|
||||
(pkg_root.parent / "config").mkdir(parents=True)
|
||||
|
||||
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
|
||||
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
|
||||
found = config_cmd._find_defaults_source_dir()
|
||||
|
||||
self.assertEqual(
|
||||
Path(found).resolve(), (pkg_root.parent / "config").resolve()
|
||||
)
|
||||
|
||||
def test_returns_none_when_no_config_dirs_exist(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
pkg_root = root / "site-packages" / "pkgmgr"
|
||||
pkg_root.mkdir(parents=True)
|
||||
|
||||
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
|
||||
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
|
||||
found = config_cmd._find_defaults_source_dir()
|
||||
|
||||
self.assertIsNone(found)
|
||||
|
||||
|
||||
class UpdateDefaultConfigsTests(unittest.TestCase):
|
||||
def test_copies_yaml_files_skips_config_yaml(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
source_dir = root / "src"
|
||||
source_dir.mkdir()
|
||||
|
||||
# Create files
|
||||
(source_dir / "a.yaml").write_text("x: 1\n", encoding="utf-8")
|
||||
(source_dir / "b.yml").write_text("y: 2\n", encoding="utf-8")
|
||||
(source_dir / "config.yaml").write_text(
|
||||
"should_not_copy: true\n", encoding="utf-8"
|
||||
)
|
||||
(source_dir / "notes.txt").write_text("nope\n", encoding="utf-8")
|
||||
|
||||
home = root / "home"
|
||||
dest_cfg_dir = home / ".config" / "pkgmgr"
|
||||
dest_cfg_dir.mkdir(parents=True)
|
||||
user_config_path = str(dest_cfg_dir / "config.yaml")
|
||||
|
||||
# Patch the source dir finder to our temp source_dir
|
||||
with patch.object(
|
||||
config_cmd, "_find_defaults_source_dir", return_value=str(source_dir)
|
||||
):
|
||||
with patch.dict(os.environ, {"HOME": str(home)}):
|
||||
config_cmd._update_default_configs(user_config_path)
|
||||
|
||||
self.assertTrue((dest_cfg_dir / "a.yaml").is_file())
|
||||
self.assertTrue((dest_cfg_dir / "b.yml").is_file())
|
||||
self.assertFalse(
|
||||
(dest_cfg_dir / "config.yaml")
|
||||
.read_text(encoding="utf-8")
|
||||
.startswith("should_not_copy")
|
||||
)
|
||||
|
||||
# Ensure config.yaml was not overwritten (it may exist, but should remain original if we create it)
|
||||
# We'll strengthen: create an original config.yaml then re-run
|
||||
(dest_cfg_dir / "config.yaml").write_text(
|
||||
"original: true\n", encoding="utf-8"
|
||||
)
|
||||
|
||||
with patch.object(
|
||||
config_cmd, "_find_defaults_source_dir", return_value=str(source_dir)
|
||||
):
|
||||
with patch.dict(os.environ, {"HOME": str(home)}):
|
||||
config_cmd._update_default_configs(user_config_path)
|
||||
|
||||
self.assertEqual(
|
||||
(dest_cfg_dir / "config.yaml").read_text(encoding="utf-8"),
|
||||
"original: true\n",
|
||||
)
|
||||
|
||||
def test_prints_warning_and_returns_when_no_source_dir(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
home = root / "home"
|
||||
dest_cfg_dir = home / ".config" / "pkgmgr"
|
||||
dest_cfg_dir.mkdir(parents=True)
|
||||
user_config_path = str(dest_cfg_dir / "config.yaml")
|
||||
|
||||
buf = io.StringIO()
|
||||
with patch.object(
|
||||
config_cmd, "_find_defaults_source_dir", return_value=None
|
||||
):
|
||||
with patch("sys.stdout", buf):
|
||||
with patch.dict(os.environ, {"HOME": str(home)}):
|
||||
config_cmd._update_default_configs(user_config_path)
|
||||
|
||||
out = buf.getvalue()
|
||||
self.assertIn("[WARN] No config directory found", out)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
271
tests/unit/pkgmgr/core/config/test_load.py
Normal file
271
tests/unit/pkgmgr/core/config/test_load.py
Normal file
@@ -0,0 +1,271 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import types
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import yaml
|
||||
|
||||
from pkgmgr.core.config.load import (
|
||||
_deep_merge,
|
||||
_merge_repo_lists,
|
||||
_load_layer_dir,
|
||||
_load_defaults_from_package_or_project,
|
||||
load_config,
|
||||
)
|
||||
|
||||
|
||||
class DeepMergeTests(unittest.TestCase):
|
||||
def test_deep_merge_overrides_scalars_and_merges_dicts(self):
|
||||
base = {"a": 1, "b": {"x": 1, "y": 2}, "c": {"k": 1}}
|
||||
override = {"a": 2, "b": {"y": 99, "z": 3}, "c": 7}
|
||||
merged = _deep_merge(base, override)
|
||||
|
||||
self.assertEqual(merged["a"], 2)
|
||||
self.assertEqual(merged["b"]["x"], 1)
|
||||
self.assertEqual(merged["b"]["y"], 99)
|
||||
self.assertEqual(merged["b"]["z"], 3)
|
||||
self.assertEqual(merged["c"], 7)
|
||||
|
||||
|
||||
class MergeRepoListsTests(unittest.TestCase):
|
||||
def test_merge_repo_lists_adds_new_repo_and_tracks_category(self):
|
||||
base = []
|
||||
new = [{"provider": "github", "account": "a", "repository": "r", "x": 1}]
|
||||
_merge_repo_lists(base, new, category_name="cat1")
|
||||
|
||||
self.assertEqual(len(base), 1)
|
||||
self.assertEqual(base[0]["provider"], "github")
|
||||
self.assertEqual(base[0]["x"], 1)
|
||||
self.assertIn("category_files", base[0])
|
||||
self.assertIn("cat1", base[0]["category_files"])
|
||||
|
||||
def test_merge_repo_lists_merges_existing_repo_fields(self):
|
||||
base = [
|
||||
{
|
||||
"provider": "github",
|
||||
"account": "a",
|
||||
"repository": "r",
|
||||
"x": 1,
|
||||
"d": {"a": 1},
|
||||
}
|
||||
]
|
||||
new = [
|
||||
{
|
||||
"provider": "github",
|
||||
"account": "a",
|
||||
"repository": "r",
|
||||
"x": 2,
|
||||
"d": {"b": 2},
|
||||
}
|
||||
]
|
||||
_merge_repo_lists(base, new, category_name="cat2")
|
||||
|
||||
self.assertEqual(len(base), 1)
|
||||
self.assertEqual(base[0]["x"], 2)
|
||||
self.assertEqual(base[0]["d"]["a"], 1)
|
||||
self.assertEqual(base[0]["d"]["b"], 2)
|
||||
self.assertIn("cat2", base[0]["category_files"])
|
||||
|
||||
def test_merge_repo_lists_incomplete_key_appends(self):
|
||||
base = []
|
||||
new = [{"foo": "bar"}] # no provider/account/repository
|
||||
_merge_repo_lists(base, new, category_name="cat")
|
||||
|
||||
self.assertEqual(len(base), 1)
|
||||
self.assertEqual(base[0]["foo"], "bar")
|
||||
self.assertIn("cat", base[0].get("category_files", []))
|
||||
|
||||
|
||||
class LoadLayerDirTests(unittest.TestCase):
|
||||
def test_load_layer_dir_merges_directories_and_repos_across_files_sorted(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
cfg_dir = Path(td)
|
||||
|
||||
# 10_b.yaml should be applied after 01_a.yaml due to name sorting
|
||||
(cfg_dir / "01_a.yaml").write_text(
|
||||
yaml.safe_dump(
|
||||
{
|
||||
"directories": {"repositories": "/opt/Repos"},
|
||||
"repositories": [
|
||||
{
|
||||
"provider": "github",
|
||||
"account": "a",
|
||||
"repository": "r1",
|
||||
"x": 1,
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
(cfg_dir / "10_b.yaml").write_text(
|
||||
yaml.safe_dump(
|
||||
{
|
||||
"directories": {"binaries": "/usr/local/bin"},
|
||||
"repositories": [
|
||||
{
|
||||
"provider": "github",
|
||||
"account": "a",
|
||||
"repository": "r1",
|
||||
"x": 2,
|
||||
},
|
||||
{"provider": "github", "account": "a", "repository": "r2"},
|
||||
],
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
defaults = _load_layer_dir(cfg_dir, skip_filename="config.yaml")
|
||||
|
||||
self.assertEqual(defaults["directories"]["repositories"], "/opt/Repos")
|
||||
self.assertEqual(defaults["directories"]["binaries"], "/usr/local/bin")
|
||||
|
||||
# r1 merged: x becomes 2 and has category_files including both stems
|
||||
repos = defaults["repositories"]
|
||||
self.assertEqual(len(repos), 2)
|
||||
r1 = next(r for r in repos if r["repository"] == "r1")
|
||||
self.assertEqual(r1["x"], 2)
|
||||
self.assertIn("01_a", r1.get("category_files", []))
|
||||
self.assertIn("10_b", r1.get("category_files", []))
|
||||
|
||||
def test_load_layer_dir_skips_config_yaml(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
cfg_dir = Path(td)
|
||||
(cfg_dir / "config.yaml").write_text(
|
||||
yaml.safe_dump({"directories": {"x": 1}}), encoding="utf-8"
|
||||
)
|
||||
(cfg_dir / "defaults.yaml").write_text(
|
||||
yaml.safe_dump({"directories": {"x": 2}}), encoding="utf-8"
|
||||
)
|
||||
|
||||
defaults = _load_layer_dir(cfg_dir, skip_filename="config.yaml")
|
||||
# only defaults.yaml should apply
|
||||
self.assertEqual(defaults["directories"]["x"], 2)
|
||||
|
||||
|
||||
class DefaultsFromPackageOrProjectTests(unittest.TestCase):
|
||||
def test_defaults_from_pkg_root_config_wins(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
pkg_root = root / "site-packages" / "pkgmgr"
|
||||
cfg_dir = pkg_root / "config"
|
||||
cfg_dir.mkdir(parents=True)
|
||||
|
||||
(cfg_dir / "defaults.yaml").write_text(
|
||||
yaml.safe_dump(
|
||||
{"directories": {"repositories": "/opt/Repos"}, "repositories": []}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
|
||||
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
|
||||
defaults = _load_defaults_from_package_or_project()
|
||||
|
||||
self.assertEqual(defaults["directories"]["repositories"], "/opt/Repos")
|
||||
|
||||
def test_defaults_from_repo_root_src_layout(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
repo_root = Path(td) / "repo"
|
||||
pkg_root = repo_root / "src" / "pkgmgr"
|
||||
cfg_dir = repo_root / "config"
|
||||
cfg_dir.mkdir(parents=True)
|
||||
pkg_root.mkdir(parents=True)
|
||||
|
||||
(cfg_dir / "defaults.yaml").write_text(
|
||||
yaml.safe_dump(
|
||||
{"directories": {"binaries": "/usr/local/bin"}, "repositories": []}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
|
||||
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
|
||||
defaults = _load_defaults_from_package_or_project()
|
||||
|
||||
self.assertEqual(defaults["directories"]["binaries"], "/usr/local/bin")
|
||||
|
||||
def test_defaults_returns_empty_when_no_config_found(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
pkg_root = Path(td) / "site-packages" / "pkgmgr"
|
||||
pkg_root.mkdir(parents=True)
|
||||
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
|
||||
|
||||
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
|
||||
defaults = _load_defaults_from_package_or_project()
|
||||
|
||||
self.assertEqual(defaults, {"directories": {}, "repositories": []})
|
||||
|
||||
|
||||
class LoadConfigIntegrationUnitTests(unittest.TestCase):
|
||||
def test_load_config_prefers_user_dir_defaults_over_package_defaults(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
home = Path(td) / "home"
|
||||
user_cfg_dir = home / ".config" / "pkgmgr"
|
||||
user_cfg_dir.mkdir(parents=True)
|
||||
user_config_path = str(user_cfg_dir / "config.yaml")
|
||||
|
||||
# user dir defaults exist -> should be used, package fallback must not matter
|
||||
(user_cfg_dir / "aa.yaml").write_text(
|
||||
yaml.safe_dump({"directories": {"repositories": "/USER/Repos"}}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
(user_cfg_dir / "config.yaml").write_text(
|
||||
yaml.safe_dump({"directories": {"binaries": "/USER/bin"}}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
with patch.dict(os.environ, {"HOME": str(home)}):
|
||||
merged = load_config(user_config_path)
|
||||
|
||||
self.assertEqual(merged["directories"]["repositories"], "/USER/Repos")
|
||||
self.assertEqual(merged["directories"]["binaries"], "/USER/bin")
|
||||
|
||||
def test_load_config_falls_back_to_package_when_user_dir_has_no_defaults(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
home = Path(td) / "home"
|
||||
user_cfg_dir = home / ".config" / "pkgmgr"
|
||||
user_cfg_dir.mkdir(parents=True)
|
||||
user_config_path = str(user_cfg_dir / "config.yaml")
|
||||
|
||||
# Only user config exists, no other yaml defaults
|
||||
(user_cfg_dir / "config.yaml").write_text(
|
||||
yaml.safe_dump({"directories": {"x": 1}}), encoding="utf-8"
|
||||
)
|
||||
|
||||
# Provide package defaults via fake pkgmgr + pkg_root/config
|
||||
root = Path(td) / "site-packages"
|
||||
pkg_root = root / "pkgmgr"
|
||||
cfg_dir = (
|
||||
root / "config"
|
||||
) # NOTE: load.py checks multiple roots, including pkg_root.parent (=site-packages)
|
||||
pkg_root.mkdir(parents=True)
|
||||
cfg_dir.mkdir(parents=True)
|
||||
|
||||
(cfg_dir / "defaults.yaml").write_text(
|
||||
yaml.safe_dump(
|
||||
{"directories": {"repositories": "/PKG/Repos"}, "repositories": []}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
|
||||
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
|
||||
with patch.dict(os.environ, {"HOME": str(home)}):
|
||||
merged = load_config(user_config_path)
|
||||
|
||||
# directories are merged: defaults then user
|
||||
self.assertEqual(merged["directories"]["repositories"], "/PKG/Repos")
|
||||
self.assertEqual(merged["directories"]["x"], 1)
|
||||
self.assertIn("repositories", merged)
|
||||
self.assertIsInstance(merged["repositories"], list)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,145 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.core.git.errors import GitRunError
|
||||
|
||||
# IMPORTANT:
|
||||
# Import the MODULE, not the function exported by pkgmgr.core.git.queries.__init__.
|
||||
pr = importlib.import_module("pkgmgr.core.git.queries.probe_remote_reachable")
|
||||
|
||||
|
||||
def _git_error(
|
||||
*,
|
||||
returncode: int,
|
||||
stderr: str = "",
|
||||
stdout: str = "",
|
||||
message: str = "git failed",
|
||||
) -> GitRunError:
|
||||
"""
|
||||
Create a GitRunError that mimics what pkgmgr.core.git.run attaches.
|
||||
"""
|
||||
exc = GitRunError(message)
|
||||
exc.returncode = returncode
|
||||
exc.stderr = stderr
|
||||
exc.stdout = stdout
|
||||
return exc
|
||||
|
||||
|
||||
class TestProbeRemoteReachableHelpers(unittest.TestCase):
|
||||
def test_first_useful_line_prefers_keyword_lines(self) -> None:
|
||||
text = "\nerror:\n \nFATAL: Could not read from remote repository.\nmore\n"
|
||||
self.assertEqual(
|
||||
pr._first_useful_line(text),
|
||||
"FATAL: Could not read from remote repository.",
|
||||
)
|
||||
|
||||
def test_first_useful_line_skips_plain_error_if_possible(self) -> None:
|
||||
text = "error:\nsome other info\n"
|
||||
self.assertEqual(pr._first_useful_line(text), "some other info")
|
||||
|
||||
def test_first_useful_line_returns_empty_for_empty(self) -> None:
|
||||
self.assertEqual(pr._first_useful_line(" \n\n"), "")
|
||||
|
||||
def test_looks_like_real_transport_error_true(self) -> None:
|
||||
self.assertTrue(
|
||||
pr._looks_like_real_transport_error(
|
||||
"fatal: Could not read from remote repository."
|
||||
)
|
||||
)
|
||||
|
||||
def test_looks_like_real_transport_error_false(self) -> None:
|
||||
self.assertFalse(pr._looks_like_real_transport_error("some harmless output"))
|
||||
|
||||
|
||||
class TestProbeRemoteReachableDetail(unittest.TestCase):
|
||||
@patch.object(pr, "run", return_value="")
|
||||
def test_detail_success_returns_true_empty_reason(self, m_run) -> None:
|
||||
ok, reason = pr.probe_remote_reachable_detail(
|
||||
"git@github.com:alice/repo.git",
|
||||
cwd="/tmp",
|
||||
)
|
||||
self.assertTrue(ok)
|
||||
self.assertEqual(reason, "")
|
||||
m_run.assert_called_once()
|
||||
|
||||
@patch.object(pr, "run")
|
||||
def test_detail_rc2_without_transport_indicators_treated_as_reachable(
|
||||
self, m_run
|
||||
) -> None:
|
||||
# rc=2 but no transport/auth indicators => treat as reachable (empty repo)
|
||||
m_run.side_effect = _git_error(
|
||||
returncode=2,
|
||||
stderr="",
|
||||
stdout="",
|
||||
message="Git command failed (exit 2)",
|
||||
)
|
||||
|
||||
ok, reason = pr.probe_remote_reachable_detail(
|
||||
"git@github.com:alice/empty.git",
|
||||
cwd="/tmp",
|
||||
)
|
||||
self.assertTrue(ok)
|
||||
self.assertIn("empty repository", reason.lower())
|
||||
|
||||
@patch.object(pr, "run")
|
||||
def test_detail_rc2_with_transport_indicators_is_not_reachable(self, m_run) -> None:
|
||||
# rc=2 but stderr indicates transport/auth problem => NOT reachable
|
||||
m_run.side_effect = _git_error(
|
||||
returncode=2,
|
||||
stderr="ERROR: Repository not found.",
|
||||
stdout="",
|
||||
message="Git command failed (exit 2)",
|
||||
)
|
||||
|
||||
ok, reason = pr.probe_remote_reachable_detail(
|
||||
"git@github.com:alice/missing.git",
|
||||
cwd="/tmp",
|
||||
)
|
||||
self.assertFalse(ok)
|
||||
self.assertIn("repository not found", reason.lower())
|
||||
|
||||
@patch.object(pr, "run")
|
||||
def test_detail_rc128_reports_reason(self, m_run) -> None:
|
||||
m_run.side_effect = _git_error(
|
||||
returncode=128,
|
||||
stderr="fatal: Could not read from remote repository.",
|
||||
stdout="",
|
||||
message="Git command failed (exit 128)",
|
||||
)
|
||||
|
||||
ok, reason = pr.probe_remote_reachable_detail(
|
||||
"ssh://git@host:2201/a/b.git",
|
||||
cwd="/tmp",
|
||||
)
|
||||
self.assertFalse(ok)
|
||||
self.assertIn("(exit 128)", reason.lower())
|
||||
self.assertIn("could not read from remote repository", reason.lower())
|
||||
|
||||
@patch.object(pr, "run")
|
||||
def test_detail_adds_hint_if_reason_is_generic(self, m_run) -> None:
|
||||
# Generic failure: rc=128 but no stderr/stdout => should append hint
|
||||
m_run.side_effect = _git_error(
|
||||
returncode=128,
|
||||
stderr="",
|
||||
stdout="",
|
||||
message="",
|
||||
)
|
||||
|
||||
url = "git@github.com:alice/repo.git"
|
||||
ok, reason = pr.probe_remote_reachable_detail(url, cwd="/tmp")
|
||||
|
||||
self.assertFalse(ok)
|
||||
self.assertIn("hint:", reason.lower())
|
||||
self.assertIn("git ls-remote --exit-code", reason.lower())
|
||||
|
||||
@patch.object(pr, "probe_remote_reachable_detail", return_value=(True, ""))
|
||||
def test_probe_remote_reachable_delegates_to_detail(self, m_detail) -> None:
|
||||
self.assertTrue(pr.probe_remote_reachable("x", cwd="/tmp"))
|
||||
m_detail.assert_called_once_with("x", cwd="/tmp")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
227
tests/unit/pkgmgr/core/remote_provisioning/test_visibility.py
Normal file
227
tests/unit/pkgmgr/core/remote_provisioning/test_visibility.py
Normal file
@@ -0,0 +1,227 @@
|
||||
# tests/unit/pkgmgr/core/remote_provisioning/test_visibility.py
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from pkgmgr.core.remote_provisioning.types import (
|
||||
AuthError,
|
||||
NetworkError,
|
||||
PermissionError,
|
||||
ProviderHint,
|
||||
RepoSpec,
|
||||
UnsupportedProviderError,
|
||||
)
|
||||
from pkgmgr.core.remote_provisioning.visibility import (
|
||||
VisibilityOptions,
|
||||
set_repo_visibility,
|
||||
)
|
||||
from pkgmgr.core.remote_provisioning.http.errors import HttpError
|
||||
|
||||
|
||||
class TestSetRepoVisibility(unittest.TestCase):
|
||||
def _mk_provider(self, *, kind: str = "gitea") -> MagicMock:
|
||||
p = MagicMock()
|
||||
p.kind = kind
|
||||
return p
|
||||
|
||||
def _mk_registry(
|
||||
self, provider: MagicMock | None, providers: list[MagicMock] | None = None
|
||||
) -> MagicMock:
|
||||
reg = MagicMock()
|
||||
reg.resolve.return_value = provider
|
||||
reg.providers = (
|
||||
providers
|
||||
if providers is not None
|
||||
else ([provider] if provider is not None else [])
|
||||
)
|
||||
return reg
|
||||
|
||||
def _mk_token_resolver(self, token: str = "TOKEN") -> MagicMock:
|
||||
resolver = MagicMock()
|
||||
tok = MagicMock()
|
||||
tok.token = token
|
||||
resolver.get_token.return_value = tok
|
||||
return resolver
|
||||
|
||||
def test_preview_returns_skipped_and_does_not_call_provider(self) -> None:
|
||||
provider = self._mk_provider()
|
||||
reg = self._mk_registry(provider)
|
||||
resolver = self._mk_token_resolver()
|
||||
|
||||
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
|
||||
|
||||
res = set_repo_visibility(
|
||||
spec,
|
||||
private=False,
|
||||
options=VisibilityOptions(preview=True),
|
||||
registry=reg,
|
||||
token_resolver=resolver,
|
||||
)
|
||||
|
||||
self.assertEqual(res.status, "skipped")
|
||||
provider.get_repo_private.assert_not_called()
|
||||
provider.set_repo_private.assert_not_called()
|
||||
|
||||
def test_unsupported_provider_raises(self) -> None:
|
||||
reg = self._mk_registry(provider=None, providers=[])
|
||||
|
||||
spec = RepoSpec(host="unknown.host", owner="me", name="repo", private=True)
|
||||
|
||||
with self.assertRaises(UnsupportedProviderError):
|
||||
set_repo_visibility(
|
||||
spec,
|
||||
private=True,
|
||||
registry=reg,
|
||||
token_resolver=self._mk_token_resolver(),
|
||||
)
|
||||
|
||||
def test_notfound_when_provider_returns_none(self) -> None:
|
||||
provider = self._mk_provider()
|
||||
provider.get_repo_private.return_value = None
|
||||
|
||||
reg = self._mk_registry(provider)
|
||||
resolver = self._mk_token_resolver()
|
||||
|
||||
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
|
||||
|
||||
res = set_repo_visibility(
|
||||
spec,
|
||||
private=True,
|
||||
registry=reg,
|
||||
token_resolver=resolver,
|
||||
)
|
||||
|
||||
self.assertEqual(res.status, "notfound")
|
||||
provider.set_repo_private.assert_not_called()
|
||||
|
||||
def test_noop_when_already_desired(self) -> None:
|
||||
provider = self._mk_provider()
|
||||
provider.get_repo_private.return_value = True
|
||||
|
||||
reg = self._mk_registry(provider)
|
||||
resolver = self._mk_token_resolver()
|
||||
|
||||
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
|
||||
|
||||
res = set_repo_visibility(
|
||||
spec,
|
||||
private=True,
|
||||
registry=reg,
|
||||
token_resolver=resolver,
|
||||
)
|
||||
|
||||
self.assertEqual(res.status, "noop")
|
||||
provider.set_repo_private.assert_not_called()
|
||||
|
||||
def test_updated_when_needs_change(self) -> None:
|
||||
provider = self._mk_provider()
|
||||
provider.get_repo_private.return_value = True
|
||||
|
||||
reg = self._mk_registry(provider)
|
||||
resolver = self._mk_token_resolver()
|
||||
|
||||
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
|
||||
|
||||
res = set_repo_visibility(
|
||||
spec,
|
||||
private=False,
|
||||
registry=reg,
|
||||
token_resolver=resolver,
|
||||
)
|
||||
|
||||
self.assertEqual(res.status, "updated")
|
||||
provider.set_repo_private.assert_called_once()
|
||||
args, kwargs = provider.set_repo_private.call_args
|
||||
self.assertEqual(kwargs.get("private"), False)
|
||||
|
||||
def test_provider_hint_overrides_registry_resolution(self) -> None:
|
||||
# registry.resolve returns gitea provider, but hint forces github provider
|
||||
gitea = self._mk_provider(kind="gitea")
|
||||
github = self._mk_provider(kind="github")
|
||||
github.get_repo_private.return_value = True
|
||||
|
||||
reg = self._mk_registry(gitea, providers=[gitea, github])
|
||||
resolver = self._mk_token_resolver()
|
||||
|
||||
spec = RepoSpec(host="github.com", owner="me", name="repo", private=True)
|
||||
|
||||
res = set_repo_visibility(
|
||||
spec,
|
||||
private=False,
|
||||
provider_hint=ProviderHint(kind="github"),
|
||||
registry=reg,
|
||||
token_resolver=resolver,
|
||||
)
|
||||
|
||||
self.assertEqual(res.status, "updated")
|
||||
github.get_repo_private.assert_called_once()
|
||||
gitea.get_repo_private.assert_not_called()
|
||||
|
||||
def test_http_error_401_maps_to_auth_error(self) -> None:
|
||||
provider = self._mk_provider()
|
||||
provider.get_repo_private.side_effect = HttpError(
|
||||
status=401, message="nope", body=""
|
||||
)
|
||||
|
||||
reg = self._mk_registry(provider)
|
||||
resolver = self._mk_token_resolver()
|
||||
|
||||
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
|
||||
|
||||
with self.assertRaises(AuthError):
|
||||
set_repo_visibility(
|
||||
spec, private=True, registry=reg, token_resolver=resolver
|
||||
)
|
||||
|
||||
def test_http_error_403_maps_to_permission_error(self) -> None:
|
||||
provider = self._mk_provider()
|
||||
provider.get_repo_private.side_effect = HttpError(
|
||||
status=403, message="nope", body=""
|
||||
)
|
||||
|
||||
reg = self._mk_registry(provider)
|
||||
resolver = self._mk_token_resolver()
|
||||
|
||||
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
|
||||
|
||||
with self.assertRaises(PermissionError):
|
||||
set_repo_visibility(
|
||||
spec, private=True, registry=reg, token_resolver=resolver
|
||||
)
|
||||
|
||||
def test_http_error_status_0_maps_to_network_error(self) -> None:
|
||||
provider = self._mk_provider()
|
||||
provider.get_repo_private.side_effect = HttpError(
|
||||
status=0, message="connection failed", body=""
|
||||
)
|
||||
|
||||
reg = self._mk_registry(provider)
|
||||
resolver = self._mk_token_resolver()
|
||||
|
||||
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
|
||||
|
||||
with self.assertRaises(NetworkError):
|
||||
set_repo_visibility(
|
||||
spec, private=True, registry=reg, token_resolver=resolver
|
||||
)
|
||||
|
||||
def test_http_error_other_maps_to_network_error(self) -> None:
|
||||
provider = self._mk_provider()
|
||||
provider.get_repo_private.side_effect = HttpError(
|
||||
status=500, message="boom", body="server error"
|
||||
)
|
||||
|
||||
reg = self._mk_registry(provider)
|
||||
resolver = self._mk_token_resolver()
|
||||
|
||||
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
|
||||
|
||||
with self.assertRaises(NetworkError):
|
||||
set_repo_visibility(
|
||||
spec, private=True, registry=reg, token_resolver=resolver
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user