Compare commits

..

8 Commits

Author SHA1 Message Date
Kevin Veen-Birkenbach
28df54503e Release version 1.9.2
Some checks are pending
Mark stable commit / test-unit (push) Waiting to run
Mark stable commit / test-integration (push) Waiting to run
Mark stable commit / test-env-virtual (push) Waiting to run
Mark stable commit / test-env-nix (push) Waiting to run
Mark stable commit / test-e2e (push) Waiting to run
Mark stable commit / test-virgin-user (push) Waiting to run
Mark stable commit / test-virgin-root (push) Waiting to run
Mark stable commit / lint-shell (push) Waiting to run
Mark stable commit / lint-python (push) Waiting to run
Mark stable commit / mark-stable (push) Blocked by required conditions
2025-12-21 15:30:22 +01:00
Kevin Veen-Birkenbach
aa489811e3 fix(config): package and load default configs correctly
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Ship default YAML configs inside the pkgmgr package
- Ensure defaults are loaded when no user config exists
- Keep user configs fully respected and non-overwritten
- Fix config update command to copy packaged defaults reliably

https://chatgpt.com/share/6947e74f-573c-800f-b93d-5ed341fcd1a3
2025-12-21 15:26:01 +01:00
Kevin Veen-Birkenbach
f66af0157b Release version 1.9.1
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-21 13:38:58 +01:00
Kevin Veen-Birkenbach
b0b3ccf5aa fix(packaging): stop including legacy pkgmgr.installers package
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Restrict setuptools package discovery to src/ (pkgmgr* only)
- Drop config/ as a Python package mapping (keep config as plain data dir)
- Remove config_defaults fallback paths and use config/ exclusively
- Add unit + integration tests for defaults.yaml loading and CLI update copying

https://chatgpt.com/share/6947e74f-573c-800f-b93d-5ed341fcd1a3
2025-12-21 13:25:38 +01:00
Kevin Veen-Birkenbach
e178afde31 Release version 1.9.0
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-20 14:37:58 +01:00
Kevin Veen-Birkenbach
9802293871 ***feat(mirror): add remote repository visibility support***
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
* Add mirror visibility subcommand and provision --public flag
* Implement core visibility API with provider support (GitHub, Gitea)
* Extend provider interface and EnsureStatus
* Add unit, integration and e2e tests for visibility handling

https://chatgpt.com/share/6946a44e-4f48-800f-8124-9c0b9b2b6b04
2025-12-20 14:26:55 +01:00
Kevin Veen-Birkenbach
a2138c9985 refactor(mirror): probe remotes with detailed reasons and provision all git mirrors
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Add probe_remote_reachable_detail and improved GitRunError metadata
- Print short failure reasons for unreachable remotes
- Provision each git mirror URL via ensure_remote_repository_for_url

https://chatgpt.com/share/6946956e-f738-800f-a446-e2c8bf5595f4
2025-12-20 13:23:24 +01:00
Kevin Veen-Birkenbach
10998e50ad ci(test-virgin-user): preserve NIX_CONFIG across sudo to avoid GitHub API rate limits
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
https://chatgpt.com/share/6945565e-f1b0-800f-86d5-8d0083fe3390
2025-12-19 14:42:36 +01:00
39 changed files with 2678 additions and 231 deletions

View File

@@ -49,11 +49,13 @@ jobs:
chown -R dev:dev /nix
chmod 0755 /nix
chmod 1777 /nix/store
sudo -H -u dev env HOME=/home/dev PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 bash -lc "
sudo -H -u dev env \
HOME=/home/dev \
NIX_CONFIG="$NIX_CONFIG" \
PKGMGR_DISABLE_NIX_FLAKE_INSTALLER=1 \
bash -lc "
set -euo pipefail
cd /opt/src/pkgmgr
make setup-venv
. \"\$HOME/.venvs/pkgmgr/bin/activate\"

View File

@@ -1,3 +1,20 @@
## [1.9.2] - 2025-12-21
* Default configuration files are now packaged and loaded correctly when no user config exists, while fully preserving custom user configurations.
## [1.9.1] - 2025-12-21
* Fixed installation issues and improved loading of default configuration files.
## [1.9.0] - 2025-12-20
* * New ***mirror visibility*** command to set remote Git repositories to ***public*** or ***private***.
* New ***--public*** flag for ***mirror provision*** to create repositories and immediately make them public.
* All configured git mirrors are now provisioned.
## [1.8.7] - 2025-12-19
* * **Release version updates now correctly modify ***pyproject.toml*** files that follow PEP 621**, ensuring the ***[project].version*** field is updated as expected.

View File

@@ -32,7 +32,7 @@
rec {
pkgmgr = pyPkgs.buildPythonApplication {
pname = "package-manager";
version = "1.8.7";
version = "1.9.2";
# Use the git repo as source
src = ./.;

View File

@@ -1,7 +1,7 @@
# Maintainer: Kevin Veen-Birkenbach <info@veen.world>
pkgname=package-manager
pkgver=1.8.7
pkgver=1.9.2
pkgrel=1
pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)."
arch=('any')

View File

@@ -1,3 +1,23 @@
package-manager (1.9.2-1) unstable; urgency=medium
* Default configuration files are now packaged and loaded correctly when no user config exists, while fully preserving custom user configurations.
-- Kevin Veen-Birkenbach <kevin@veen.world> Sun, 21 Dec 2025 15:30:22 +0100
package-manager (1.9.1-1) unstable; urgency=medium
* Fixed installation issues and improved loading of default configuration files.
-- Kevin Veen-Birkenbach <kevin@veen.world> Sun, 21 Dec 2025 13:38:58 +0100
package-manager (1.9.0-1) unstable; urgency=medium
* * New ***mirror visibility*** command to set remote Git repositories to ***public*** or ***private***.
* New ***--public*** flag for ***mirror provision*** to create repositories and immediately make them public.
* All configured git mirrors are now provisioned.
-- Kevin Veen-Birkenbach <kevin@veen.world> Sat, 20 Dec 2025 14:37:58 +0100
package-manager (1.8.7-1) unstable; urgency=medium
* * **Release version updates now correctly modify ***pyproject.toml*** files that follow PEP 621**, ensuring the ***[project].version*** field is updated as expected.

View File

@@ -1,5 +1,5 @@
Name: package-manager
Version: 1.8.7
Version: 1.9.2
Release: 1%{?dist}
Summary: Wrapper that runs Kevin's package-manager via Nix flake
@@ -74,6 +74,17 @@ echo ">>> package-manager removed. Nix itself was not removed."
/usr/lib/package-manager/
%changelog
* Sun Dec 21 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.9.2-1
- Default configuration files are now packaged and loaded correctly when no user config exists, while fully preserving custom user configurations.
* Sun Dec 21 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.9.1-1
- Fixed installation issues and improved loading of default configuration files.
* Sat Dec 20 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.9.0-1
- * New ***mirror visibility*** command to set remote Git repositories to ***public*** or ***private***.
* New ***--public*** flag for ***mirror provision*** to create repositories and immediately make them public.
* All configured git mirrors are now provisioned.
* Fri Dec 19 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.8.7-1
- * **Release version updates now correctly modify ***pyproject.toml*** files that follow PEP 621**, ensuring the ***[project].version*** field is updated as expected.
* **Invalid or incomplete ***pyproject.toml*** files are now handled gracefully** with clear error messages instead of abrupt process termination.

View File

@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "kpmx"
version = "1.8.7"
version = "1.9.2"
description = "Kevin's package-manager tool (pkgmgr)"
readme = "README.md"
requires-python = ">=3.9"
@@ -43,11 +43,12 @@ pkgmgr = "pkgmgr.cli:main"
# -----------------------------
# Source layout: all packages live under "src/"
[tool.setuptools]
package-dir = { "" = "src", "config" = "config" }
package-dir = { "" = "src" }
include-package-data = true
[tool.setuptools.packages.find]
where = ["src", "."]
include = ["pkgmgr*", "config*"]
where = ["src"]
include = ["pkgmgr*"]
[tool.setuptools.package-data]
"config" = ["defaults.yaml"]
"pkgmgr.config" = ["*.yml", "*.yaml"]

View File

@@ -14,6 +14,7 @@ from .list_cmd import list_mirrors
from .diff_cmd import diff_mirrors
from .merge_cmd import merge_mirrors
from .setup_cmd import setup_mirrors
from .visibility_cmd import set_mirror_visibility
__all__ = [
"Repository",
@@ -22,4 +23,5 @@ __all__ = [
"diff_mirrors",
"merge_mirrors",
"setup_mirrors",
"set_mirror_visibility",
]

View File

@@ -11,35 +11,37 @@ from .types import Repository
from .url_utils import normalize_provider_host, parse_repo_from_git_url
def ensure_remote_repository(
repo: Repository,
repositories_base_dir: str,
all_repos: List[Repository],
def _provider_hint_from_host(host: str) -> str | None:
h = (host or "").lower()
if h == "github.com":
return "github"
# Best-effort default for self-hosted git domains
return "gitea" if h else None
def ensure_remote_repository_for_url(
*,
url: str,
private_default: bool,
description: str,
preview: bool,
) -> None:
ctx = build_context(repo, repositories_base_dir, all_repos)
primary_url = determine_primary_remote_url(repo, ctx)
if not primary_url:
print("[INFO] No primary URL found; skipping remote provisioning.")
return
host_raw, owner, name = parse_repo_from_git_url(primary_url)
host_raw, owner, name = parse_repo_from_git_url(url)
host = normalize_provider_host(host_raw)
if not host or not owner or not name:
print("[WARN] Could not parse remote URL:", primary_url)
print(f"[WARN] Could not parse repo from URL: {url}")
return
spec = RepoSpec(
host=host,
owner=owner,
name=name,
private=bool(repo.get("private", True)),
description=str(repo.get("description", "")),
private=private_default,
description=description,
)
provider_kind = str(repo.get("provider", "")).lower() or None
provider_kind = _provider_hint_from_host(host)
try:
result = ensure_remote_repo(
@@ -56,4 +58,29 @@ def ensure_remote_repository(
if result.url:
print(f"[REMOTE ENSURE] URL: {result.url}")
except Exception as exc: # noqa: BLE001
print(f"[ERROR] Remote provisioning failed: {exc}")
print(f"[ERROR] Remote provisioning failed for {url!r}: {exc}")
def ensure_remote_repository(
repo: Repository,
repositories_base_dir: str,
all_repos: List[Repository],
preview: bool,
) -> None:
"""
Backwards-compatible wrapper: ensure the *primary* remote repository
derived from the primary URL.
"""
ctx = build_context(repo, repositories_base_dir, all_repos)
primary_url = determine_primary_remote_url(repo, ctx)
if not primary_url:
print("[INFO] No primary URL found; skipping remote provisioning.")
return
ensure_remote_repository_for_url(
url=primary_url,
private_default=bool(repo.get("private", True)),
description=str(repo.get("description", "")),
preview=preview,
)

View File

@@ -2,12 +2,15 @@ from __future__ import annotations
from typing import List
from pkgmgr.core.git.queries import probe_remote_reachable
from pkgmgr.core.git.queries import probe_remote_reachable_detail
from pkgmgr.core.remote_provisioning import ProviderHint, RepoSpec, set_repo_visibility
from pkgmgr.core.remote_provisioning.visibility import VisibilityOptions
from .context import build_context
from .git_remote import ensure_origin_remote, determine_primary_remote_url
from .remote_provision import ensure_remote_repository
from .git_remote import determine_primary_remote_url, ensure_origin_remote
from .remote_provision import ensure_remote_repository_for_url
from .types import Repository
from .url_utils import normalize_provider_host, parse_repo_from_git_url
def _is_git_remote_url(url: str) -> bool:
@@ -25,6 +28,64 @@ def _is_git_remote_url(url: str) -> bool:
return False
def _provider_hint_from_host(host: str) -> str | None:
h = (host or "").lower()
if h == "github.com":
return "github"
return "gitea" if h else None
def _apply_visibility_for_url(
*,
url: str,
private: bool,
description: str,
preview: bool,
) -> None:
host_raw, owner, name = parse_repo_from_git_url(url)
host = normalize_provider_host(host_raw)
if not host or not owner or not name:
print(f"[WARN] Could not parse repo from URL: {url}")
return
spec = RepoSpec(
host=host,
owner=owner,
name=name,
private=private,
description=description,
)
provider_kind = _provider_hint_from_host(host)
res = set_repo_visibility(
spec,
private=private,
provider_hint=ProviderHint(kind=provider_kind),
options=VisibilityOptions(preview=preview),
)
print(f"[REMOTE VISIBILITY] {res.status.upper()}: {res.message}")
def _print_probe_result(name: str | None, url: str, *, cwd: str) -> None:
"""
Print probe result for a git remote URL, including a short failure reason.
"""
ok, reason = probe_remote_reachable_detail(url, cwd=cwd)
prefix = f"{name}: " if name else ""
if ok:
print(f"[OK] {prefix}{url}")
return
print(f"[WARN] {prefix}{url}")
if reason:
reason = reason.strip()
if len(reason) > 240:
reason = reason[:240].rstrip() + ""
print(f" reason: {reason}")
def _setup_local_mirrors_for_repo(
repo: Repository,
repositories_base_dir: str,
@@ -48,6 +109,7 @@ def _setup_remote_mirrors_for_repo(
all_repos: List[Repository],
preview: bool,
ensure_remote: bool,
ensure_visibility: str | None,
) -> None:
ctx = build_context(repo, repositories_base_dir, all_repos)
@@ -56,35 +118,78 @@ def _setup_remote_mirrors_for_repo(
print(f"[MIRROR SETUP:REMOTE] dir: {ctx.repo_dir}")
print("------------------------------------------------------------")
if ensure_remote:
ensure_remote_repository(
repo,
repositories_base_dir,
all_repos,
preview,
)
# Probe only git URLs (do not try ls-remote against PyPI etc.)
# If there are no mirrors at all, probe the primary git URL.
git_mirrors = {
k: v for k, v in ctx.resolved_mirrors.items() if _is_git_remote_url(v)
}
def _desired_private_default() -> bool:
# default behavior: repo['private'] (or True)
if ensure_visibility == "public":
return False
if ensure_visibility == "private":
return True
return bool(repo.get("private", True))
def _should_enforce_visibility() -> bool:
return ensure_visibility in ("public", "private")
def _visibility_private_value() -> bool:
return ensure_visibility == "private"
description = str(repo.get("description", ""))
# If there are no git mirrors, fall back to primary (git) URL.
if not git_mirrors:
primary = determine_primary_remote_url(repo, ctx)
if not primary or not _is_git_remote_url(primary):
print("[INFO] No git mirrors to probe.")
print("[INFO] No git mirrors to probe or provision.")
print()
return
ok = probe_remote_reachable(primary, cwd=ctx.repo_dir)
print("[OK]" if ok else "[WARN]", primary)
if ensure_remote:
print(f"[REMOTE ENSURE] ensuring primary: {primary}")
ensure_remote_repository_for_url(
url=primary,
private_default=_desired_private_default(),
description=description,
preview=preview,
)
# IMPORTANT: enforce visibility only if requested
if _should_enforce_visibility():
_apply_visibility_for_url(
url=primary,
private=_visibility_private_value(),
description=description,
preview=preview,
)
print()
_print_probe_result(None, primary, cwd=ctx.repo_dir)
print()
return
# Provision ALL git mirrors (if requested)
if ensure_remote:
for name, url in git_mirrors.items():
ok = probe_remote_reachable(url, cwd=ctx.repo_dir)
print(f"[OK] {name}: {url}" if ok else f"[WARN] {name}: {url}")
print(f"[REMOTE ENSURE] ensuring mirror {name!r}: {url}")
ensure_remote_repository_for_url(
url=url,
private_default=_desired_private_default(),
description=description,
preview=preview,
)
if _should_enforce_visibility():
_apply_visibility_for_url(
url=url,
private=_visibility_private_value(),
description=description,
preview=preview,
)
print()
# Probe ALL git mirrors
for name, url in git_mirrors.items():
_print_probe_result(name, url, cwd=ctx.repo_dir)
print()
@@ -97,6 +202,7 @@ def setup_mirrors(
local: bool = True,
remote: bool = True,
ensure_remote: bool = False,
ensure_visibility: str | None = None,
) -> None:
for repo in selected_repos:
if local:
@@ -114,4 +220,5 @@ def setup_mirrors(
all_repos,
preview,
ensure_remote,
ensure_visibility,
)

View File

@@ -0,0 +1,134 @@
from __future__ import annotations
from typing import List
from pkgmgr.core.remote_provisioning import ProviderHint, RepoSpec, set_repo_visibility
from pkgmgr.core.remote_provisioning.visibility import VisibilityOptions
from .context import build_context
from .git_remote import determine_primary_remote_url
from .types import Repository
from .url_utils import normalize_provider_host, parse_repo_from_git_url
def _is_git_remote_url(url: str) -> bool:
# Keep same semantics as setup_cmd.py / git_remote.py
u = (url or "").strip()
if not u:
return False
if u.startswith("git@"):
return True
if u.startswith("ssh://"):
return True
if (u.startswith("https://") or u.startswith("http://")) and u.endswith(".git"):
return True
return False
def _provider_hint_from_host(host: str) -> str | None:
h = (host or "").lower()
if h == "github.com":
return "github"
# Best-effort default for self-hosted git domains
return "gitea" if h else None
def _apply_visibility_for_url(
*,
url: str,
private: bool,
description: str,
preview: bool,
) -> None:
host_raw, owner, name = parse_repo_from_git_url(url)
host = normalize_provider_host(host_raw)
if not host or not owner or not name:
print(f"[WARN] Could not parse repo from URL: {url}")
return
spec = RepoSpec(
host=host,
owner=owner,
name=name,
private=private,
description=description,
)
provider_kind = _provider_hint_from_host(host)
res = set_repo_visibility(
spec,
private=private,
provider_hint=ProviderHint(kind=provider_kind),
options=VisibilityOptions(preview=preview),
)
print(f"[REMOTE VISIBILITY] {res.status.upper()}: {res.message}")
def set_mirror_visibility(
selected_repos: List[Repository],
repositories_base_dir: str,
all_repos: List[Repository],
*,
visibility: str,
preview: bool = False,
) -> None:
"""
Set remote repository visibility for all git mirrors of each selected repo.
visibility:
- "private"
- "public"
"""
v = (visibility or "").strip().lower()
if v not in ("private", "public"):
raise ValueError("visibility must be 'private' or 'public'")
desired_private = v == "private"
for repo in selected_repos:
ctx = build_context(repo, repositories_base_dir, all_repos)
print("------------------------------------------------------------")
print(f"[MIRROR VISIBILITY] {ctx.identifier}")
print(f"[MIRROR VISIBILITY] dir: {ctx.repo_dir}")
print(f"[MIRROR VISIBILITY] target: {v}")
print("------------------------------------------------------------")
git_mirrors = {
name: url
for name, url in ctx.resolved_mirrors.items()
if url and _is_git_remote_url(url)
}
# If there are no git mirrors, fall back to primary (git) URL.
if not git_mirrors:
primary = determine_primary_remote_url(repo, ctx)
if not primary or not _is_git_remote_url(primary):
print(
"[INFO] No git mirrors found (and no primary git URL). Nothing to do."
)
print()
continue
print(f"[MIRROR VISIBILITY] applying to primary: {primary}")
_apply_visibility_for_url(
url=primary,
private=desired_private,
description=str(repo.get("description", "")),
preview=preview,
)
print()
continue
# Apply to ALL git mirrors
for name, url in git_mirrors.items():
print(f"[MIRROR VISIBILITY] applying to mirror {name!r}: {url}")
_apply_visibility_for_url(
url=url,
private=desired_private,
description=str(repo.get("description", "")),
preview=preview,
)
print()

View File

@@ -1,3 +1,4 @@
# src/pkgmgr/cli/commands/config.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
@@ -38,27 +39,16 @@ def _load_user_config(user_config_path: str) -> Dict[str, Any]:
def _find_defaults_source_dir() -> Optional[str]:
"""
Find the directory inside the installed pkgmgr package OR the
project root that contains default config files.
Find the directory inside the installed pkgmgr package that contains
the default config files.
Preferred locations (in dieser Reihenfolge):
- <pkg_root>/config_defaults
Preferred location:
- <pkg_root>/config
- <project_root>/config_defaults
- <project_root>/config
"""
import pkgmgr # local import to avoid circular deps
pkg_root = Path(pkgmgr.__file__).resolve().parent
project_root = pkg_root.parent
candidates = [
pkg_root / "config_defaults",
pkg_root / "config",
project_root / "config_defaults",
project_root / "config",
]
for cand in candidates:
cand = pkg_root / "config"
if cand.is_dir():
return str(cand)
return None
@@ -73,7 +63,7 @@ def _update_default_configs(user_config_path: str) -> None:
source_dir = _find_defaults_source_dir()
if not source_dir:
print(
"[WARN] No config_defaults or config directory found in "
"[WARN] No config directory found in "
"pkgmgr installation. Nothing to update."
)
return
@@ -88,7 +78,6 @@ def _update_default_configs(user_config_path: str) -> None:
if not (lower.endswith(".yml") or lower.endswith(".yaml")):
continue
if name == "config.yaml":
# Never overwrite the user config template / live config
continue
src = os.path.join(source_dir, name)
@@ -102,48 +91,28 @@ def handle_config(args, ctx: CLIContext) -> None:
"""
Handle 'pkgmgr config' subcommands.
"""
user_config_path = ctx.user_config_path
# ------------------------------------------------------------
# config show
# ------------------------------------------------------------
if args.subcommand == "show":
if args.all or (not args.identifiers):
# Full merged config view
show_config([], user_config_path, full_config=True)
else:
# Show only matching entries from user config
user_config = _load_user_config(user_config_path)
selected = resolve_repos(
args.identifiers,
user_config.get("repositories", []),
args.identifiers, user_config.get("repositories", [])
)
if selected:
show_config(
selected,
user_config_path,
full_config=False,
)
show_config(selected, user_config_path, full_config=False)
return
# ------------------------------------------------------------
# config add
# ------------------------------------------------------------
if args.subcommand == "add":
interactive_add(ctx.config_merged, user_config_path)
return
# ------------------------------------------------------------
# config edit
# ------------------------------------------------------------
if args.subcommand == "edit":
run_command(f"nano {user_config_path}")
return
# ------------------------------------------------------------
# config init
# ------------------------------------------------------------
if args.subcommand == "init":
user_config = _load_user_config(user_config_path)
config_init(
@@ -154,9 +123,6 @@ def handle_config(args, ctx: CLIContext) -> None:
)
return
# ------------------------------------------------------------
# config delete
# ------------------------------------------------------------
if args.subcommand == "delete":
user_config = _load_user_config(user_config_path)
@@ -167,10 +133,7 @@ def handle_config(args, ctx: CLIContext) -> None:
)
return
to_delete = resolve_repos(
args.identifiers,
user_config.get("repositories", []),
)
to_delete = resolve_repos(args.identifiers, user_config.get("repositories", []))
new_repos = [
entry
for entry in user_config.get("repositories", [])
@@ -181,9 +144,6 @@ def handle_config(args, ctx: CLIContext) -> None:
print(f"Deleted {len(to_delete)} entries from user config.")
return
# ------------------------------------------------------------
# config ignore
# ------------------------------------------------------------
if args.subcommand == "ignore":
user_config = _load_user_config(user_config_path)
@@ -194,17 +154,10 @@ def handle_config(args, ctx: CLIContext) -> None:
)
return
to_modify = resolve_repos(
args.identifiers,
user_config.get("repositories", []),
)
to_modify = resolve_repos(args.identifiers, user_config.get("repositories", []))
for entry in user_config["repositories"]:
key = (
entry.get("provider"),
entry.get("account"),
entry.get("repository"),
)
key = (entry.get("provider"), entry.get("account"), entry.get("repository"))
for mod in to_modify:
mod_key = (
mod.get("provider"),
@@ -218,21 +171,9 @@ def handle_config(args, ctx: CLIContext) -> None:
save_user_config(user_config, user_config_path)
return
# ------------------------------------------------------------
# config update
# ------------------------------------------------------------
if args.subcommand == "update":
"""
Copy default YAML configs from the installed package into the
user's ~/.config/pkgmgr directory.
This will overwrite files with the same name (except config.yaml).
"""
_update_default_configs(user_config_path)
return
# ------------------------------------------------------------
# Unknown subcommand
# ------------------------------------------------------------
print(f"Unknown config subcommand: {args.subcommand}")
sys.exit(2)

View File

@@ -8,6 +8,7 @@ from pkgmgr.actions.mirror import (
diff_mirrors,
list_mirrors,
merge_mirrors,
set_mirror_visibility,
setup_mirrors,
)
from pkgmgr.cli.context import CLIContext
@@ -30,6 +31,7 @@ def handle_mirror_command(
- mirror setup
- mirror check
- mirror provision
- mirror visibility
"""
if not selected:
print("[INFO] No repositories selected for 'mirror' command.")
@@ -92,6 +94,7 @@ def handle_mirror_command(
local=True,
remote=False,
ensure_remote=False,
ensure_visibility=None,
)
return
@@ -105,11 +108,14 @@ def handle_mirror_command(
local=False,
remote=True,
ensure_remote=False,
ensure_visibility=None,
)
return
if subcommand == "provision":
preview = getattr(args, "preview", False)
public = bool(getattr(args, "public", False))
setup_mirrors(
selected_repos=selected,
repositories_base_dir=ctx.repositories_base_dir,
@@ -118,6 +124,23 @@ def handle_mirror_command(
local=False,
remote=True,
ensure_remote=True,
ensure_visibility="public" if public else None,
)
return
if subcommand == "visibility":
preview = getattr(args, "preview", False)
visibility = getattr(args, "visibility", None)
if visibility not in ("private", "public"):
print("[ERROR] mirror visibility expects 'private' or 'public'.")
sys.exit(2)
set_mirror_visibility(
selected_repos=selected,
repositories_base_dir=ctx.repositories_base_dir,
all_repos=ctx.all_repositories,
visibility=visibility,
preview=preview,
)
return

View File

@@ -1,4 +1,3 @@
# src/pkgmgr/cli/parser/mirror_cmd.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
@@ -12,7 +11,7 @@ from .common import add_identifier_arguments
def add_mirror_subparsers(subparsers: argparse._SubParsersAction) -> None:
mirror_parser = subparsers.add_parser(
"mirror",
help="Mirror-related utilities (list, diff, merge, setup, check, provision)",
help="Mirror-related utilities (list, diff, merge, setup, check, provision, visibility)",
)
mirror_subparsers = mirror_parser.add_subparsers(
dest="subcommand",
@@ -68,4 +67,20 @@ def add_mirror_subparsers(subparsers: argparse._SubParsersAction) -> None:
"provision",
help="Provision remote repositories via provider APIs (create missing repos).",
)
mirror_provision.add_argument(
"--public",
action="store_true",
help="After ensuring repos exist, enforce public visibility on the remote provider.",
)
add_identifier_arguments(mirror_provision)
mirror_visibility = mirror_subparsers.add_parser(
"visibility",
help="Set visibility (public/private) for all remote git mirrors via provider APIs.",
)
mirror_visibility.add_argument(
"visibility",
choices=["private", "public"],
help="Target visibility for all git mirrors.",
)
add_identifier_arguments(mirror_visibility)

View File

@@ -1,3 +1,4 @@
# src/pkgmgr/core/config/load.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
@@ -7,31 +8,28 @@ Load and merge pkgmgr configuration.
Layering rules:
1. Defaults / category files:
- Zuerst werden alle *.yml/*.yaml (außer config.yaml) im
Benutzerverzeichnis geladen:
- First load all *.yml/*.yaml (except config.yaml) from the user directory:
~/.config/pkgmgr/
- Falls dort keine passenden Dateien existieren, wird auf die im
Paket / Projekt mitgelieferten Config-Verzeichnisse zurückgegriffen:
- If no matching files exist there, fall back to defaults shipped with pkgmgr:
<pkg_root>/config_defaults
<pkg_root>/config
<project_root>/config_defaults
<project_root>/config
Dabei werden ebenfalls alle *.yml/*.yaml als Layer geladen.
During development (src-layout), we optionally also check:
<repo_root>/config
- Der Dateiname ohne Endung (stem) wird als Kategorie-Name
verwendet und in repo["category_files"] eingetragen.
All *.yml/*.yaml files are loaded as layers.
- The filename stem is used as category name and stored in repo["category_files"].
2. User config:
- ~/.config/pkgmgr/config.yaml (oder der übergebene Pfad)
wird geladen und PER LISTEN-MERGE über die Defaults gelegt:
- ~/.config/pkgmgr/config.yaml (or the provided path)
is loaded and merged over defaults:
- directories: dict deep-merge
- repositories: per _merge_repo_lists (kein Löschen!)
- repositories: per _merge_repo_lists (no deletions!)
3. Ergebnis:
- Ein dict mit mindestens:
3. Result:
- A dict with at least:
config["directories"] (dict)
config["repositories"] (list[dict])
"""
@@ -40,7 +38,7 @@ from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional
from typing import Any, Dict, List, Optional, Tuple
import yaml
@@ -48,7 +46,7 @@ Repo = Dict[str, Any]
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# Helper functions
# ---------------------------------------------------------------------------
@@ -85,17 +83,16 @@ def _merge_repo_lists(
"""
Merge two repository lists, matching by (provider, account, repository).
- Wenn ein Repo aus new_list noch nicht existiert, wird es hinzugefügt.
- Wenn es existiert, werden seine Felder per Deep-Merge überschrieben.
- Wenn category_name gesetzt ist, wird dieser in
repo["category_files"] eingetragen.
- If a repo from new_list does not exist, it is added.
- If it exists, its fields are deep-merged (override wins).
- If category_name is set, it is appended to repo["category_files"].
"""
index: Dict[Tuple[str, str, str], Repo] = {_repo_key(r): r for r in base_list}
for src in new_list:
key = _repo_key(src)
if key == ("", "", ""):
# Unvollständiger Schlüssel -> einfach anhängen
# Incomplete key -> append as-is
dst = dict(src)
if category_name:
dst.setdefault("category_files", [])
@@ -143,10 +140,9 @@ def _load_layer_dir(
"""
Load all *.yml/*.yaml from a directory as layered defaults.
- skip_filename: Dateiname (z.B. "config.yaml"), der ignoriert
werden soll (z.B. User-Config).
- skip_filename: filename (e.g. "config.yaml") to ignore.
Rückgabe:
Returns:
{
"directories": {...},
"repositories": [...],
@@ -171,7 +167,7 @@ def _load_layer_dir(
for path in yaml_files:
data = _load_yaml_file(path)
category_name = path.stem # Dateiname ohne .yml/.yaml
category_name = path.stem
dirs = data.get("directories")
if isinstance(dirs, dict):
@@ -192,8 +188,11 @@ def _load_layer_dir(
def _load_defaults_from_package_or_project() -> Dict[str, Any]:
"""
Fallback: load default configs from various possible install or development
layouts (pip-installed, editable install, source repo with src/ layout).
Fallback: load default configs from possible install or dev layouts.
Supported locations:
- <pkg_root>/config (installed wheel / editable)
- <repo_root>/config (optional dev fallback when pkg_root is src/pkgmgr)
"""
try:
import pkgmgr # type: ignore
@@ -201,25 +200,16 @@ def _load_defaults_from_package_or_project() -> Dict[str, Any]:
return {"directories": {}, "repositories": []}
pkg_root = Path(pkgmgr.__file__).resolve().parent
roots = set()
candidates: List[Path] = []
# Case 1: installed package (site-packages/pkgmgr)
roots.add(pkg_root)
# Always prefer package-internal config dir
candidates.append(pkg_root / "config")
# Case 2: parent directory (site-packages/, src/)
roots.add(pkg_root.parent)
# Case 3: src-layout during development:
# repo_root/src/pkgmgr -> repo_root
# Dev fallback: repo_root/src/pkgmgr -> repo_root/config
parent = pkg_root.parent
if parent.name == "src":
roots.add(parent.parent)
# Candidate config dirs
candidates = []
for root in roots:
candidates.append(root / "config_defaults")
candidates.append(root / "config")
repo_root = parent.parent
candidates.append(repo_root / "config")
for cand in candidates:
defaults = _load_layer_dir(cand, skip_filename=None)
@@ -230,7 +220,7 @@ def _load_defaults_from_package_or_project() -> Dict[str, Any]:
# ---------------------------------------------------------------------------
# Hauptfunktion
# Public API
# ---------------------------------------------------------------------------
@@ -238,53 +228,49 @@ def load_config(user_config_path: str) -> Dict[str, Any]:
"""
Load and merge configuration for pkgmgr.
Schritte:
1. Ermittle ~/.config/pkgmgr/ (oder das Verzeichnis von user_config_path).
2. Lade alle *.yml/*.yaml dort (außer der User-Config selbst) als
Defaults / Kategorie-Layer.
3. Wenn dort nichts gefunden wurde, Fallback auf Paket/Projekt.
4. Lade die User-Config-Datei selbst (falls vorhanden).
Steps:
1. Determine ~/.config/pkgmgr/ (or dir of user_config_path).
2. Load all *.yml/*.yaml in that dir (except the user config file) as defaults.
3. If nothing found, fall back to package defaults.
4. Load the user config file (if present).
5. Merge:
- directories: deep-merge (Defaults <- User)
- repositories: _merge_repo_lists (Defaults <- User)
- directories: deep-merge (defaults <- user)
- repositories: _merge_repo_lists (defaults <- user)
"""
user_config_path_expanded = os.path.expanduser(user_config_path)
user_cfg_path = Path(user_config_path_expanded)
config_dir = user_cfg_path.parent
if not str(config_dir):
# Fallback, falls jemand nur "config.yaml" übergibt
config_dir = Path(os.path.expanduser("~/.config/pkgmgr"))
config_dir.mkdir(parents=True, exist_ok=True)
user_cfg_name = user_cfg_path.name
# 1+2) Defaults / Kategorie-Layer aus dem User-Verzeichnis
# 1+2) Defaults from user directory
defaults = _load_layer_dir(config_dir, skip_filename=user_cfg_name)
# 3) Falls dort nichts gefunden wurde, Fallback auf Paket/Projekt
# 3) Fallback to package defaults
if not defaults["directories"] and not defaults["repositories"]:
defaults = _load_defaults_from_package_or_project()
defaults.setdefault("directories", {})
defaults.setdefault("repositories", [])
# 4) User-Config
# 4) User config
user_cfg: Dict[str, Any] = {}
if user_cfg_path.is_file():
user_cfg = _load_yaml_file(user_cfg_path)
user_cfg.setdefault("directories", {})
user_cfg.setdefault("repositories", [])
# 5) Merge: directories deep-merge, repositories listen-merge
# 5) Merge
merged: Dict[str, Any] = {}
# directories
merged["directories"] = {}
_deep_merge(merged["directories"], defaults["directories"])
_deep_merge(merged["directories"], user_cfg["directories"])
# repositories
merged["repositories"] = []
_merge_repo_lists(
merged["repositories"], defaults["repositories"], category_name=None
@@ -293,7 +279,7 @@ def load_config(user_config_path: str) -> Dict[str, Any]:
merged["repositories"], user_cfg["repositories"], category_name=None
)
# andere Top-Level-Keys (falls vorhanden)
# Merge other top-level keys
other_keys = (set(defaults.keys()) | set(user_cfg.keys())) - {
"directories",
"repositories",

View File

@@ -20,7 +20,10 @@ from .get_tags_at_ref import GitTagsAtRefQueryError, get_tags_at_ref
from .get_upstream_ref import get_upstream_ref
from .list_remotes import list_remotes
from .list_tags import list_tags
from .probe_remote_reachable import probe_remote_reachable
from .probe_remote_reachable import (
probe_remote_reachable,
probe_remote_reachable_detail,
)
from .resolve_base_branch import GitBaseBranchNotFoundError, resolve_base_branch
__all__ = [
@@ -37,6 +40,7 @@ __all__ = [
"list_remotes",
"get_remote_push_urls",
"probe_remote_reachable",
"probe_remote_reachable_detail",
"get_changelog",
"GitChangelogQueryError",
"get_tags_at_ref",

View File

@@ -1,21 +1,121 @@
from __future__ import annotations
from typing import Tuple
from ..errors import GitRunError
from ..run import run
def probe_remote_reachable(url: str, cwd: str = ".") -> bool:
def _first_useful_line(text: str) -> str:
lines: list[str] = []
for line in (text or "").splitlines():
s = line.strip()
if s:
lines.append(s)
if not lines:
return ""
preferred_keywords = (
"fatal:",
"permission denied",
"repository not found",
"could not read from remote repository",
"connection refused",
"connection timed out",
"no route to host",
"name or service not known",
"temporary failure in name resolution",
"host key verification failed",
"could not resolve hostname",
"authentication failed",
"publickey",
"the authenticity of host",
"known_hosts",
)
for s in lines:
low = s.lower()
if any(k in low for k in preferred_keywords):
return s
# Avoid returning a meaningless "error:" if possible
for s in lines:
if s.lower() not in ("error:", "error"):
return s
return lines[0]
def _looks_like_real_transport_error(text: str) -> bool:
"""
Check whether a remote URL is reachable.
True if stderr/stdout contains strong indicators that the remote is NOT usable.
"""
low = (text or "").lower()
indicators = (
"repository not found",
"could not read from remote repository",
"permission denied",
"authentication failed",
"publickey",
"host key verification failed",
"could not resolve hostname",
"name or service not known",
"connection refused",
"connection timed out",
"no route to host",
)
return any(i in low for i in indicators)
Equivalent to:
git ls-remote --exit-code <url>
Returns:
True if reachable, False otherwise.
def _format_reason(exc: GitRunError, *, url: str) -> str:
stderr = getattr(exc, "stderr", "") or ""
stdout = getattr(exc, "stdout", "") or ""
rc = getattr(exc, "returncode", None)
reason = (
_first_useful_line(stderr)
or _first_useful_line(stdout)
or _first_useful_line(str(exc))
)
if rc is not None:
reason = f"(exit {rc}) {reason}".strip() if reason else f"(exit {rc})"
# If we still have nothing useful, provide a hint to debug SSH transport
if not reason or reason.lower() in ("(exit 2)", "(exit 128)"):
reason = (
f"{reason} | hint: run "
f"GIT_SSH_COMMAND='ssh -vvv' git ls-remote --exit-code {url!r}"
).strip()
return reason.strip()
def probe_remote_reachable_detail(url: str, cwd: str = ".") -> Tuple[bool, str]:
"""
Probe whether a remote URL is reachable.
Implementation detail:
- We run `git ls-remote --exit-code <url>`.
- Git may return exit code 2 when the remote is reachable but no refs exist
(e.g. an empty repository). We treat that as reachable.
"""
try:
run(["ls-remote", "--exit-code", url], cwd=cwd)
return True
except GitRunError:
return False
return True, ""
except GitRunError as exc:
rc = getattr(exc, "returncode", None)
stderr = getattr(exc, "stderr", "") or ""
stdout = getattr(exc, "stdout", "") or ""
# Important: `git ls-remote --exit-code` uses exit code 2 when no refs match.
# For a completely empty repo, this can happen even though auth/transport is OK.
if rc == 2 and not _looks_like_real_transport_error(stderr + "\n" + stdout):
return True, "remote reachable, but no refs found yet (empty repository)"
return False, _format_reason(exc, url=url)
def probe_remote_reachable(url: str, cwd: str = ".") -> bool:
ok, _ = probe_remote_reachable_detail(url, cwd=cwd)
return ok

View File

@@ -42,16 +42,34 @@ def run(
)
except subprocess.CalledProcessError as exc:
stderr = exc.stderr or ""
if _is_not_repo_error(stderr):
raise GitNotRepositoryError(
f"Not a git repository: {cwd!r}\nCommand: {cmd_str}\nSTDERR:\n{stderr}"
) from exc
stdout = exc.stdout or ""
raise GitRunError(
if _is_not_repo_error(stderr):
err = GitNotRepositoryError(
f"Not a git repository: {cwd!r}\nCommand: {cmd_str}\nSTDERR:\n{stderr}"
)
# Attach details for callers who want to debug
err.cwd = cwd
err.cmd = cmd
err.cmd_str = cmd_str
err.returncode = exc.returncode
err.stdout = stdout
err.stderr = stderr
raise err from exc
err = GitRunError(
f"Git command failed in {cwd!r}: {cmd_str}\n"
f"Exit code: {exc.returncode}\n"
f"STDOUT:\n{exc.stdout}\n"
f"STDOUT:\n{stdout}\n"
f"STDERR:\n{stderr}"
) from exc
)
# Attach details for callers who want to debug
err.cwd = cwd
err.cmd = cmd
err.cmd_str = cmd_str
err.returncode = exc.returncode
err.stdout = stdout
err.stderr = stderr
raise err from exc
return result.stdout.strip()

View File

@@ -1,12 +1,13 @@
# src/pkgmgr/core/remote_provisioning/__init__.py
"""Remote repository provisioning (ensure remote repo exists)."""
from .ensure import ensure_remote_repo
from .registry import ProviderRegistry
from .types import EnsureResult, ProviderHint, RepoSpec
from .visibility import set_repo_visibility
__all__ = [
"ensure_remote_repo",
"set_repo_visibility",
"RepoSpec",
"EnsureResult",
"ProviderHint",

View File

@@ -1,4 +1,3 @@
# src/pkgmgr/core/remote_provisioning/providers/base.py
from __future__ import annotations
from abc import ABC, abstractmethod
@@ -23,7 +22,26 @@ class RemoteProvider(ABC):
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
"""Create a repository (owner may be user or org)."""
@abstractmethod
def get_repo_private(self, token: str, spec: RepoSpec) -> bool | None:
"""
Return current repo privacy, or None if repo not found / inaccessible.
IMPORTANT:
- Must NOT create repositories.
- Should return None on 404 (not found) or when the repo cannot be accessed.
"""
@abstractmethod
def set_repo_private(self, token: str, spec: RepoSpec, *, private: bool) -> None:
"""
Update repo privacy (PATCH). Must NOT create repositories.
Implementations should raise HttpError on API failure.
"""
def ensure_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
"""Ensure repository exists (create if missing)."""
if self.repo_exists(token, spec):
return EnsureResult(status="exists", message="Repository exists.")
return self.create_repo(token, spec)

View File

@@ -52,6 +52,39 @@ class GiteaProvider(RemoteProvider):
return False
raise
def get_repo_private(self, token: str, spec: RepoSpec) -> bool | None:
base = self._api_base(spec.host)
url = f"{base}/api/v1/repos/{spec.owner}/{spec.name}"
try:
resp = self._http.request_json("GET", url, headers=self._headers(token))
except HttpError as exc:
if exc.status == 404:
return None
raise
if not (200 <= resp.status < 300):
return None
data = resp.json or {}
return bool(data.get("private", False))
def set_repo_private(self, token: str, spec: RepoSpec, *, private: bool) -> None:
base = self._api_base(spec.host)
url = f"{base}/api/v1/repos/{spec.owner}/{spec.name}"
payload: Dict[str, Any] = {"private": bool(private)}
resp = self._http.request_json(
"PATCH",
url,
headers=self._headers(token),
payload=payload,
)
if not (200 <= resp.status < 300):
raise HttpError(
status=resp.status,
message="Failed to update repository.",
body=resp.text,
)
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
base = self._api_base(spec.host)

View File

@@ -54,6 +54,39 @@ class GitHubProvider(RemoteProvider):
return False
raise
def get_repo_private(self, token: str, spec: RepoSpec) -> bool | None:
api = self._api_base(spec.host)
url = f"{api}/repos/{spec.owner}/{spec.name}"
try:
resp = self._http.request_json("GET", url, headers=self._headers(token))
except HttpError as exc:
if exc.status == 404:
return None
raise
if not (200 <= resp.status < 300):
return None
data = resp.json or {}
return bool(data.get("private", False))
def set_repo_private(self, token: str, spec: RepoSpec, *, private: bool) -> None:
api = self._api_base(spec.host)
url = f"{api}/repos/{spec.owner}/{spec.name}"
payload: Dict[str, Any] = {"private": bool(private)}
resp = self._http.request_json(
"PATCH",
url,
headers=self._headers(token),
payload=payload,
)
if not (200 <= resp.status < 300):
raise HttpError(
status=resp.status,
message="Failed to update repository.",
body=resp.text,
)
def create_repo(self, token: str, spec: RepoSpec) -> EnsureResult:
api = self._api_base(spec.host)

View File

@@ -1,10 +1,17 @@
# src/pkgmgr/core/remote_provisioning/types.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, Optional
EnsureStatus = Literal["exists", "created", "skipped", "failed"]
EnsureStatus = Literal[
"exists",
"created",
"updated",
"noop",
"notfound",
"skipped",
"failed",
]
@dataclass(frozen=True)

View File

@@ -0,0 +1,118 @@
# src/pkgmgr/core/remote_provisioning/visibility.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from pkgmgr.core.credentials.resolver import ResolutionOptions, TokenResolver
from .http.errors import HttpError
from .registry import ProviderRegistry
from .types import (
AuthError,
EnsureResult,
NetworkError,
PermissionError,
ProviderHint,
RepoSpec,
UnsupportedProviderError,
)
@dataclass(frozen=True)
class VisibilityOptions:
"""Options controlling remote visibility updates."""
preview: bool = False
interactive: bool = True
allow_prompt: bool = True
save_prompt_token_to_keyring: bool = True
def _raise_mapped_http_error(exc: HttpError, host: str) -> None:
"""Map HttpError into domain-specific error types."""
if exc.status == 0:
raise NetworkError(f"Network error while talking to {host}: {exc}") from exc
if exc.status == 401:
raise AuthError(f"Authentication failed for {host} (401).") from exc
if exc.status == 403:
raise PermissionError(f"Permission denied for {host} (403).") from exc
raise NetworkError(
f"HTTP error from {host}: status={exc.status}, message={exc}, body={exc.body}"
) from exc
def set_repo_visibility(
spec: RepoSpec,
*,
private: bool,
provider_hint: Optional[ProviderHint] = None,
options: Optional[VisibilityOptions] = None,
registry: Optional[ProviderRegistry] = None,
token_resolver: Optional[TokenResolver] = None,
) -> EnsureResult:
"""
Set repository visibility (public/private) WITHOUT creating repositories.
Behavior:
- If repo does not exist -> status=notfound
- If already desired -> status=noop
- If changed -> status=updated
- Respects preview mode -> status=skipped
- Maps HTTP errors to domain-specific errors
"""
opts = options or VisibilityOptions()
reg = registry or ProviderRegistry.default()
resolver = token_resolver or TokenResolver()
provider = reg.resolve(spec.host)
if provider_hint and provider_hint.kind:
forced = provider_hint.kind.strip().lower()
forced_provider = next(
(p for p in reg.providers if getattr(p, "kind", "").lower() == forced),
None,
)
if forced_provider is not None:
provider = forced_provider
if provider is None:
raise UnsupportedProviderError(f"No provider matched host: {spec.host}")
token_opts = ResolutionOptions(
interactive=opts.interactive,
allow_prompt=opts.allow_prompt,
save_prompt_token_to_keyring=opts.save_prompt_token_to_keyring,
)
token = resolver.get_token(
provider_kind=getattr(provider, "kind", "unknown"),
host=spec.host,
owner=spec.owner,
options=token_opts,
)
if opts.preview:
return EnsureResult(
status="skipped",
message="Preview mode: no remote changes performed.",
)
try:
current_private = provider.get_repo_private(token.token, spec)
if current_private is None:
return EnsureResult(status="notfound", message="Repository not found.")
if bool(current_private) == bool(private):
return EnsureResult(
status="noop",
message=f"Repository already {'private' if private else 'public'}.",
)
provider.set_repo_private(token.token, spec, private=private)
return EnsureResult(
status="updated",
message=f"Visibility updated to {'private' if private else 'public'}.",
)
except HttpError as exc:
_raise_mapped_http_error(exc, host=spec.host)
return EnsureResult(status="failed", message="Unreachable error mapping.")

View File

@@ -0,0 +1,127 @@
# tests/e2e/test_mirror_visibility_smoke.py
from __future__ import annotations
import os
import shutil
import subprocess
import sys
import unittest
from pathlib import Path
class TestMirrorVisibilityE2ESmoke(unittest.TestCase):
"""
E2E smoke tests for the new mirror visibility feature.
We intentionally DO NOT execute provider APIs or require tokens.
The tests only verify that:
- CLI exposes the new subcommands / flags via --help
- Python public API surface is wired and importable
IMPORTANT:
- `python -m pkgmgr.cli` is NOT valid unless pkgmgr/cli/__main__.py exists.
- In this repo, `from pkgmgr.cli import main` is the stable entrypoint.
"""
@staticmethod
def _project_root() -> Path:
# tests/e2e/... -> project root is parents[2]
return Path(__file__).resolve().parents[2]
def _run(self, args: list[str]) -> subprocess.CompletedProcess[str]:
env = os.environ.copy()
env.setdefault("PYTHONUNBUFFERED", "1")
return subprocess.run(
args,
cwd=str(self._project_root()),
env=env,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=False,
)
def _run_pkgmgr(self, pkgmgr_args: list[str]) -> subprocess.CompletedProcess[str]:
"""
Run the pkgmgr CLI in a way that works both:
- when the console script `pkgmgr` is available on PATH
- when only source imports are available
We prefer the console script if present because it's closest to real E2E.
"""
exe = shutil.which("pkgmgr")
if exe:
return self._run([exe, *pkgmgr_args])
# Fallback to a Python-level entrypoint that exists in your repo:
# The stacktrace showed: from pkgmgr.cli import main
# We call it with argv simulation.
code = r"""
import sys
from pkgmgr.cli import main
sys.argv = ["pkgmgr"] + sys.argv[1:]
main()
"""
return self._run([sys.executable, "-c", code, *pkgmgr_args])
def test_cli_help_lists_visibility_and_provision_public(self) -> None:
# `pkgmgr mirror --help` should mention "visibility"
p = self._run_pkgmgr(["mirror", "--help"])
self.assertEqual(
p.returncode,
0,
msg=f"Expected exit code 0, got {p.returncode}\n\nOutput:\n{p.stdout}",
)
out_lower = p.stdout.lower()
self.assertIn("visibility", out_lower)
self.assertIn("provision", out_lower)
# `pkgmgr mirror provision --help` should show `--public`
p = self._run_pkgmgr(["mirror", "provision", "--help"])
self.assertEqual(
p.returncode,
0,
msg=f"Expected exit code 0, got {p.returncode}\n\nOutput:\n{p.stdout}",
)
self.assertIn("--public", p.stdout)
# `pkgmgr mirror visibility --help` should show choices {private, public}
p = self._run_pkgmgr(["mirror", "visibility", "--help"])
self.assertEqual(
p.returncode,
0,
msg=f"Expected exit code 0, got {p.returncode}\n\nOutput:\n{p.stdout}",
)
out_lower = p.stdout.lower()
self.assertIn("private", out_lower)
self.assertIn("public", out_lower)
def test_python_api_surface_is_exposed(self) -> None:
# Ensure public exports exist and setup_mirrors has ensure_visibility in signature.
code = r"""
import inspect
from pkgmgr.actions import mirror as mirror_actions
from pkgmgr.core import remote_provisioning as rp
assert hasattr(mirror_actions, "set_mirror_visibility"), "set_mirror_visibility missing in pkgmgr.actions.mirror"
assert hasattr(rp, "set_repo_visibility"), "set_repo_visibility missing in pkgmgr.core.remote_provisioning"
sig = inspect.signature(mirror_actions.setup_mirrors)
assert "ensure_visibility" in sig.parameters, "setup_mirrors missing ensure_visibility parameter"
print("OK")
"""
p = self._run([sys.executable, "-c", code])
self.assertEqual(
p.returncode,
0,
msg=f"Expected exit code 0, got {p.returncode}\n\nOutput:\n{p.stdout}",
)
self.assertIn("OK", p.stdout)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,118 @@
# tests/integration/test_config_defaults_integration.py
from __future__ import annotations
import os
import sys
import tempfile
import types
import unittest
from pathlib import Path
from unittest.mock import patch
import yaml
from pkgmgr.core.config.load import load_config
from pkgmgr.cli.commands import config as config_cmd
class ConfigDefaultsIntegrationTest(unittest.TestCase):
def test_defaults_yaml_is_loaded_and_can_be_copied_to_user_config_dir(self):
"""
Integration test:
- Create a temp "site-packages/pkgmgr" fake install root
- Put defaults under "<pkg_root>/config/defaults.yaml"
- Verify:
A) load_config() picks up defaults from that config folder when user dir has no defaults
B) _update_default_configs() copies defaults.yaml into ~/.config/pkgmgr/
"""
with tempfile.TemporaryDirectory() as td:
root = Path(td)
# Fake HOME for user config
home = root / "home"
user_cfg_dir = home / ".config" / "pkgmgr"
user_cfg_dir.mkdir(parents=True)
user_config_path = str(user_cfg_dir / "config.yaml")
# Create a user config file that should NOT be overwritten by update
(user_cfg_dir / "config.yaml").write_text(
yaml.safe_dump({"directories": {"user_only": "/home/user"}}),
encoding="utf-8",
)
# Fake pkg install layout:
# pkg_root = <root>/site-packages/pkgmgr
site_packages = root / "site-packages"
pkg_root = site_packages / "pkgmgr"
pkg_root.mkdir(parents=True)
# defaults live inside the package now: <pkg_root>/config/defaults.yaml
config_dir = pkg_root / "config"
config_dir.mkdir(parents=True)
defaults_payload = {
"directories": {
"repositories": "/opt/Repositories",
"binaries": "/usr/local/bin",
},
"repositories": [
{"provider": "github", "account": "acme", "repository": "demo"}
],
}
(config_dir / "defaults.yaml").write_text(
yaml.safe_dump(defaults_payload),
encoding="utf-8",
)
# Provide fake pkgmgr module so your functions resolve pkg_root correctly
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
with patch.dict(os.environ, {"HOME": str(home)}):
# A) load_config should fall back to <pkg_root>/config/defaults.yaml
merged = load_config(user_config_path)
self.assertEqual(
merged["directories"]["repositories"], "/opt/Repositories"
)
self.assertEqual(
merged["directories"]["binaries"], "/usr/local/bin"
)
# user-only key must still exist (user config merges over defaults)
self.assertEqual(merged["directories"]["user_only"], "/home/user")
self.assertIn("repositories", merged)
self.assertTrue(
any(
r.get("provider") == "github"
and r.get("account") == "acme"
and r.get("repository") == "demo"
for r in merged["repositories"]
)
)
# B) update_default_configs should copy defaults.yaml to ~/.config/pkgmgr/
before_config_yaml = (user_cfg_dir / "config.yaml").read_text(
encoding="utf-8"
)
config_cmd._update_default_configs(user_config_path)
self.assertTrue((user_cfg_dir / "defaults.yaml").is_file())
copied_defaults = yaml.safe_load(
(user_cfg_dir / "defaults.yaml").read_text(encoding="utf-8")
)
self.assertEqual(
copied_defaults["directories"]["repositories"],
"/opt/Repositories",
)
after_config_yaml = (user_cfg_dir / "config.yaml").read_text(
encoding="utf-8"
)
self.assertEqual(after_config_yaml, before_config_yaml)
if __name__ == "__main__":
unittest.main()

View File

@@ -113,17 +113,12 @@ class TestIntegrationMirrorCommands(unittest.TestCase):
)
)
# Deterministic remote probing (new refactor: probe_remote_reachable)
# Deterministic remote probing (refactor: probe_remote_reachable_detail)
# Patch where it is USED (setup_cmd imported it directly).
stack.enter_context(
_p(
"pkgmgr.core.git.queries.probe_remote_reachable",
return_value=True,
)
)
stack.enter_context(
_p(
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable",
return_value=True,
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail",
return_value=(True, ""),
)
)

View File

@@ -0,0 +1,220 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Integration test for mirror probing + provisioning after refactor.
We test the CLI entrypoint `handle_mirror_command()` directly to avoid
depending on repo-selection / config parsing for `--all`.
Covers:
- setup_cmd uses probe_remote_reachable_detail()
- check prints [OK]/[WARN] and 'reason:' lines for failures
- provision triggers ensure_remote_repo (preview-safe) for each git mirror
"""
from __future__ import annotations
import io
import tempfile
import unittest
from contextlib import redirect_stderr, redirect_stdout
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, PropertyMock, patch
from pkgmgr.cli.commands.mirror import handle_mirror_command
class TestIntegrationMirrorProbeDetailAndProvision(unittest.TestCase):
def _make_ctx(
self, *, repositories_base_dir: str, all_repositories: list[dict]
) -> MagicMock:
ctx = MagicMock()
ctx.repositories_base_dir = repositories_base_dir
ctx.all_repositories = all_repositories
# mirror merge may look at this; keep it present for safety
ctx.user_config_path = str(Path(repositories_base_dir) / "user.yml")
return ctx
def _make_dummy_repo_ctx(self, *, repo_dir: str) -> MagicMock:
"""
This is the RepoMirrorContext-like object returned by build_context().
"""
dummy = MagicMock()
dummy.identifier = "dummy-repo"
dummy.repo_dir = repo_dir
dummy.config_mirrors = {"origin": "git@github.com:alice/repo.git"}
dummy.file_mirrors = {"backup": "ssh://git@git.example:2201/alice/repo.git"}
type(dummy).resolved_mirrors = PropertyMock(
return_value={
"origin": "git@github.com:alice/repo.git",
"backup": "ssh://git@git.example:2201/alice/repo.git",
}
)
return dummy
def _run_handle(
self,
*,
subcommand: str,
preview: bool,
selected: list[dict],
dummy_repo_dir: str,
probe_detail_side_effect,
) -> str:
"""
Run handle_mirror_command() with patched side effects and capture output.
"""
args = SimpleNamespace(subcommand=subcommand, preview=preview)
# Fake ensure_remote_repo result (preview safe)
def _fake_ensure_remote_repo(spec, provider_hint=None, options=None):
if options is not None and getattr(options, "preview", False) is not True:
raise AssertionError(
"ensure_remote_repo called without preview=True (should never happen in tests)."
)
r = MagicMock()
r.status = "preview"
r.message = "Preview mode: no remote provisioning performed."
r.url = None
return r
buf = io.StringIO()
ctx = self._make_ctx(
repositories_base_dir=str(Path(dummy_repo_dir).parent),
all_repositories=selected,
)
dummy_repo_ctx = self._make_dummy_repo_ctx(repo_dir=dummy_repo_dir)
with (
patch(
"pkgmgr.actions.mirror.setup_cmd.build_context",
return_value=dummy_repo_ctx,
),
patch(
"pkgmgr.actions.mirror.setup_cmd.ensure_origin_remote",
return_value=None,
),
patch(
"pkgmgr.actions.mirror.git_remote.ensure_origin_remote",
return_value=None,
),
patch(
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail",
side_effect=probe_detail_side_effect,
),
patch(
"pkgmgr.actions.mirror.remote_provision.ensure_remote_repo",
side_effect=_fake_ensure_remote_repo,
),
redirect_stdout(buf),
redirect_stderr(buf),
):
handle_mirror_command(ctx, args, selected)
return buf.getvalue()
def test_mirror_check_preview_prints_warn_reason(self) -> None:
"""
'mirror check --preview' should:
- probe both git mirrors
- print [OK] for origin
- print [WARN] for backup + reason line
"""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
repo_dir = tmp_path / "dummy-repo"
repo_dir.mkdir(parents=True, exist_ok=True)
selected = [
{"provider": "github.com", "account": "alice", "repository": "repo"}
]
def probe_side_effect(url: str, cwd: str = "."):
if "github.com" in url:
# show "empty repo reachable" note; setup_cmd prints [OK] and does not print reason for ok
return (
True,
"remote reachable, but no refs found yet (empty repository)",
)
return False, "(exit 128) fatal: Could not read from remote repository."
out = self._run_handle(
subcommand="check",
preview=True,
selected=selected,
dummy_repo_dir=str(repo_dir),
probe_detail_side_effect=probe_side_effect,
)
self.assertIn("[MIRROR SETUP:REMOTE]", out)
# origin OK (even with a note returned; still OK)
self.assertIn("[OK] origin: git@github.com:alice/repo.git", out)
# backup WARN prints reason line
self.assertIn(
"[WARN] backup: ssh://git@git.example:2201/alice/repo.git", out
)
self.assertIn("reason:", out)
self.assertIn("Could not read from remote repository", out)
def test_mirror_provision_preview_provisions_each_git_mirror(self) -> None:
"""
'mirror provision --preview' should:
- print provisioning lines for each git mirror
- still probe and print [OK]/[WARN]
- call ensure_remote_repo only in preview mode (enforced by fake)
"""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
repo_dir = tmp_path / "dummy-repo"
repo_dir.mkdir(parents=True, exist_ok=True)
selected = [
{
"provider": "github.com",
"account": "alice",
"repository": "repo",
"private": True,
"description": "desc",
}
]
def probe_side_effect(url: str, cwd: str = "."):
if "github.com" in url:
return True, ""
return False, "(exit 128) fatal: Could not read from remote repository."
out = self._run_handle(
subcommand="provision",
preview=True,
selected=selected,
dummy_repo_dir=str(repo_dir),
probe_detail_side_effect=probe_side_effect,
)
# provisioning should attempt BOTH mirrors
self.assertIn(
"[REMOTE ENSURE] ensuring mirror 'origin': git@github.com:alice/repo.git",
out,
)
self.assertIn(
"[REMOTE ENSURE] ensuring mirror 'backup': ssh://git@git.example:2201/alice/repo.git",
out,
)
# patched ensure_remote_repo prints PREVIEW status via remote_provision
self.assertIn("[REMOTE ENSURE]", out)
self.assertIn("PREVIEW", out.upper())
# probes after provisioning
self.assertIn("[OK] origin: git@github.com:alice/repo.git", out)
self.assertIn(
"[WARN] backup: ssh://git@git.example:2201/alice/repo.git", out
)
if __name__ == "__main__":
unittest.main()

View File

@@ -40,8 +40,8 @@ class TestCreateRepoPypiNotInGitConfig(unittest.TestCase):
with (
# Avoid any real network calls during mirror "remote probing"
patch(
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable",
return_value=True,
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail",
return_value=(True, ""),
),
# Force templates to come from our temp directory
patch(

View File

@@ -0,0 +1,322 @@
# tests/integration/test_visibility_integration.py
from __future__ import annotations
import io
import os
import tempfile
import types
import unittest
from contextlib import redirect_stdout
from typing import Any, Dict, List, Optional, Tuple
from unittest.mock import patch
from pkgmgr.actions.mirror.setup_cmd import setup_mirrors
from pkgmgr.actions.mirror.visibility_cmd import set_mirror_visibility
from pkgmgr.core.remote_provisioning.types import RepoSpec
Repository = Dict[str, Any]
class _FakeRegistry:
"""
Minimal ProviderRegistry-like object for tests.
- has .providers for provider-hint selection
- has .resolve(host) to pick a provider
"""
def __init__(self, provider: Any) -> None:
self.providers = [provider]
self._provider = provider
def resolve(self, host: str) -> Any:
return self._provider
class FakeProvider:
"""
Fake remote provider implementing the visibility API surface.
Key feature: tolerant host matching, because normalize_provider_host()/URL parsing
may drop ports or schemes.
"""
kind = "gitea"
def __init__(self) -> None:
# maps (host, owner, name) -> private(bool)
self.privacy: Dict[Tuple[str, str, str], bool] = {}
self.calls: List[Tuple[str, Any]] = []
def can_handle(self, host: str) -> bool:
return True
def _candidate_hosts(self, host: str) -> List[str]:
"""
Be tolerant against host normalization differences:
- may contain scheme (https://...)
- may contain port (host:2201)
"""
h = (host or "").strip()
if not h:
return [h]
candidates = [h]
# strip scheme if present
if h.startswith("http://"):
candidates.append(h[len("http://") :])
if h.startswith("https://"):
candidates.append(h[len("https://") :])
# strip port if present (host:port)
for c in list(candidates):
if ":" in c:
candidates.append(c.split(":", 1)[0])
# de-dup
out: List[str] = []
for c in candidates:
if c not in out:
out.append(c)
return out
def repo_exists(self, token: str, spec: RepoSpec) -> bool:
self.calls.append(("repo_exists", (token, spec)))
for h in self._candidate_hosts(spec.host):
if (h, spec.owner, spec.name) in self.privacy:
return True
return False
def create_repo(self, token: str, spec: RepoSpec):
self.calls.append(("create_repo", (token, spec)))
# store under the provided host (as-is)
self.privacy[(spec.host, spec.owner, spec.name)] = bool(spec.private)
return types.SimpleNamespace(status="created", message="created", url=None)
def get_repo_private(self, token: str, spec: RepoSpec) -> Optional[bool]:
self.calls.append(("get_repo_private", (token, spec)))
for h in self._candidate_hosts(spec.host):
key = (h, spec.owner, spec.name)
if key in self.privacy:
return self.privacy[key]
return None
def set_repo_private(self, token: str, spec: RepoSpec, *, private: bool) -> None:
self.calls.append(("set_repo_private", (token, spec, private)))
# update whichever key exists; else create on spec.host
for h in self._candidate_hosts(spec.host):
key = (h, spec.owner, spec.name)
if key in self.privacy:
self.privacy[key] = bool(private)
return
self.privacy[(spec.host, spec.owner, spec.name)] = bool(private)
def _mk_ctx(*, identifier: str, repo_dir: str, mirrors: Dict[str, str]) -> Any:
return types.SimpleNamespace(
identifier=identifier,
repo_dir=repo_dir,
resolved_mirrors=mirrors,
)
class TestMirrorVisibilityIntegration(unittest.TestCase):
"""
Integration tests for:
- pkgmgr.actions.mirror.visibility_cmd.set_mirror_visibility
- pkgmgr.actions.mirror.setup_cmd.setup_mirrors (ensure_visibility semantics)
"""
def setUp(self) -> None:
self.tmp = tempfile.TemporaryDirectory()
self.addCleanup(self.tmp.cleanup)
def _repo_dir(self, name: str) -> str:
d = os.path.join(self.tmp.name, name)
os.makedirs(d, exist_ok=True)
return d
@patch("pkgmgr.core.credentials.resolver.TokenResolver.get_token")
@patch("pkgmgr.core.remote_provisioning.visibility.ProviderRegistry.default")
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
def test_mirror_visibility_applies_to_all_git_mirrors_updated_and_noop(
self,
m_build_context,
m_registry_default,
m_get_token,
) -> None:
"""
Scenario:
- repo has two git mirrors
- one mirror needs update -> UPDATED
- second mirror already desired -> NOOP
"""
provider = FakeProvider()
registry = _FakeRegistry(provider)
m_registry_default.return_value = registry
# Avoid interactive token prompt
m_get_token.return_value = types.SimpleNamespace(token="test-token")
# Seed provider state:
# - repo1 currently private=True
# - We'll set visibility to public -> should UPDATE
provider.privacy[("git.veen.world", "me", "repo1")] = True
repo = {"id": "repo1", "description": "Repo 1"}
repo_dir = self._repo_dir("repo1")
m_build_context.return_value = _mk_ctx(
identifier="repo1",
repo_dir=repo_dir,
mirrors={
"origin": "ssh://git.veen.world:2201/me/repo1.git",
"backup": "https://git.veen.world:2201/me/repo1.git",
},
)
buf = io.StringIO()
with redirect_stdout(buf):
set_mirror_visibility(
selected_repos=[repo],
repositories_base_dir=self.tmp.name,
all_repos=[repo],
visibility="public",
preview=False,
)
out = buf.getvalue()
# We apply to BOTH git mirrors.
self.assertIn("[MIRROR VISIBILITY] applying to mirror 'origin':", out)
self.assertIn("[MIRROR VISIBILITY] applying to mirror 'backup':", out)
# After first update, second call will see it already public (NOOP).
self.assertIn("[REMOTE VISIBILITY] UPDATED:", out)
self.assertIn("[REMOTE VISIBILITY] NOOP:", out)
# Final state must be public (private=False)
self.assertFalse(provider.privacy[("git.veen.world", "me", "repo1")])
@patch("pkgmgr.core.credentials.resolver.TokenResolver.get_token")
@patch("pkgmgr.core.remote_provisioning.visibility.ProviderRegistry.default")
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
@patch("pkgmgr.actions.mirror.visibility_cmd.determine_primary_remote_url")
def test_mirror_visibility_fallback_to_primary_when_no_git_mirrors(
self,
m_determine_primary,
m_build_context,
m_registry_default,
m_get_token,
) -> None:
"""
Scenario:
- no git mirrors in MIRRORS config
- we fall back to primary URL and apply visibility there
"""
provider = FakeProvider()
registry = _FakeRegistry(provider)
m_registry_default.return_value = registry
m_get_token.return_value = types.SimpleNamespace(token="test-token")
# Seed state: currently public (private=False), target private -> UPDATED
provider.privacy[("git.veen.world", "me", "repo2")] = False
repo = {"id": "repo2", "description": "Repo 2"}
repo_dir = self._repo_dir("repo2")
m_build_context.return_value = _mk_ctx(
identifier="repo2",
repo_dir=repo_dir,
mirrors={
# non-git mirror entries
"pypi": "https://pypi.org/project/example/",
},
)
m_determine_primary.return_value = "ssh://git.veen.world:2201/me/repo2.git"
buf = io.StringIO()
with redirect_stdout(buf):
set_mirror_visibility(
selected_repos=[repo],
repositories_base_dir=self.tmp.name,
all_repos=[repo],
visibility="private",
preview=False,
)
out = buf.getvalue()
self.assertIn("[MIRROR VISIBILITY] applying to primary:", out)
self.assertIn("[REMOTE VISIBILITY] UPDATED:", out)
self.assertTrue(provider.privacy[("git.veen.world", "me", "repo2")])
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail")
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_remote_repository_for_url")
@patch("pkgmgr.core.credentials.resolver.TokenResolver.get_token")
@patch("pkgmgr.core.remote_provisioning.visibility.ProviderRegistry.default")
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
def test_setup_mirrors_provision_public_enforces_visibility_and_private_default(
self,
m_build_context,
m_registry_default,
m_get_token,
m_ensure_remote_for_url,
m_probe,
) -> None:
"""
Covers the "mirror provision --public" semantics:
- setup_mirrors(remote=True, ensure_remote=True, ensure_visibility="public")
- ensure_remote_repository_for_url is called with private_default=False
- then set_repo_visibility is applied (UPDATED/NOOP depending on current state)
- git probing is mocked (no subprocess)
"""
provider = FakeProvider()
registry = _FakeRegistry(provider)
m_registry_default.return_value = registry
m_get_token.return_value = types.SimpleNamespace(token="test-token")
# Make git probing always OK (no subprocess calls)
m_probe.return_value = (True, "")
# Seed provider: repo4 currently private=True, target public -> UPDATED
provider.privacy[("git.veen.world", "me", "repo4")] = True
repo = {"id": "repo4", "description": "Repo 4", "private": True}
repo_dir = self._repo_dir("repo4")
m_build_context.return_value = _mk_ctx(
identifier="repo4",
repo_dir=repo_dir,
mirrors={
"origin": "ssh://git.veen.world:2201/me/repo4.git",
},
)
buf = io.StringIO()
with redirect_stdout(buf):
setup_mirrors(
selected_repos=[repo],
repositories_base_dir=self.tmp.name,
all_repos=[repo],
preview=False,
local=False,
remote=True,
ensure_remote=True,
ensure_visibility="public",
)
out = buf.getvalue()
# ensure_remote_repository_for_url called and private_default overridden to False
self.assertTrue(m_ensure_remote_for_url.called)
_, kwargs = m_ensure_remote_for_url.call_args
self.assertIn("private_default", kwargs)
self.assertFalse(kwargs["private_default"])
# Visibility should be enforced
self.assertIn("[REMOTE VISIBILITY] UPDATED:", out)
self.assertFalse(provider.privacy[("git.veen.world", "me", "repo4")])
if __name__ == "__main__":
unittest.main()

View File

@@ -38,7 +38,6 @@ class TestMirrorSetupCmd(unittest.TestCase):
ensure_remote=False,
)
# ensure_origin_remote(repo, ctx, preview) is called positionally in your code
m_ensure.assert_called_once()
args, kwargs = m_ensure.call_args
@@ -50,13 +49,13 @@ class TestMirrorSetupCmd(unittest.TestCase):
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
@patch("pkgmgr.actions.mirror.setup_cmd.determine_primary_remote_url")
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable")
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail")
def test_setup_mirrors_remote_no_mirrors_probes_primary(
self, m_probe, m_primary, m_ctx
self, m_probe_detail, m_primary, m_ctx
) -> None:
m_ctx.return_value = self._ctx(repo_dir="/tmp/repo", resolved={})
m_primary.return_value = "git@github.com:alice/repo.git"
m_probe.return_value = True
m_probe_detail.return_value = (True, "")
repos = [{"provider": "github.com", "account": "alice", "repository": "repo"}]
setup_mirrors(
@@ -70,14 +69,14 @@ class TestMirrorSetupCmd(unittest.TestCase):
)
m_primary.assert_called()
m_probe.assert_called_once_with(
m_probe_detail.assert_called_once_with(
"git@github.com:alice/repo.git", cwd="/tmp/repo"
)
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable")
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail")
def test_setup_mirrors_remote_with_mirrors_probes_each(
self, m_probe, m_ctx
self, m_probe_detail, m_ctx
) -> None:
m_ctx.return_value = self._ctx(
repo_dir="/tmp/repo",
@@ -86,7 +85,7 @@ class TestMirrorSetupCmd(unittest.TestCase):
"backup": "ssh://git@git.veen.world:2201/alice/repo.git",
},
)
m_probe.return_value = True
m_probe_detail.return_value = (True, "")
repos = [{"provider": "github.com", "account": "alice", "repository": "repo"}]
setup_mirrors(
@@ -99,12 +98,105 @@ class TestMirrorSetupCmd(unittest.TestCase):
ensure_remote=False,
)
self.assertEqual(m_probe.call_count, 2)
m_probe.assert_any_call("git@github.com:alice/repo.git", cwd="/tmp/repo")
m_probe.assert_any_call(
# Should probe BOTH git mirror URLs
self.assertEqual(m_probe_detail.call_count, 2)
m_probe_detail.assert_any_call("git@github.com:alice/repo.git", cwd="/tmp/repo")
m_probe_detail.assert_any_call(
"ssh://git@git.veen.world:2201/alice/repo.git", cwd="/tmp/repo"
)
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail")
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_remote_repository_for_url")
def test_setup_mirrors_remote_with_mirrors_ensure_remote_provisions_each(
self, m_ensure_url, m_probe_detail, m_ctx
) -> None:
m_ctx.return_value = self._ctx(
repo_dir="/tmp/repo",
resolved={
"origin": "git@github.com:alice/repo.git",
"backup": "ssh://git@git.veen.world:2201/alice/repo.git",
},
)
m_probe_detail.return_value = (True, "")
repos = [
{
"provider": "github.com",
"account": "alice",
"repository": "repo",
"private": True,
"description": "desc",
}
]
setup_mirrors(
selected_repos=repos,
repositories_base_dir="/tmp",
all_repos=repos,
preview=True,
local=False,
remote=True,
ensure_remote=True,
)
# Provision both mirrors
self.assertEqual(m_ensure_url.call_count, 2)
m_ensure_url.assert_any_call(
url="git@github.com:alice/repo.git",
private_default=True,
description="desc",
preview=True,
)
m_ensure_url.assert_any_call(
url="ssh://git@git.veen.world:2201/alice/repo.git",
private_default=True,
description="desc",
preview=True,
)
# Still probes both
self.assertEqual(m_probe_detail.call_count, 2)
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
@patch("pkgmgr.actions.mirror.setup_cmd.determine_primary_remote_url")
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_remote_repository_for_url")
@patch("pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable_detail")
def test_setup_mirrors_remote_no_mirrors_ensure_remote_provisions_primary(
self, m_probe_detail, m_ensure_url, m_primary, m_ctx
) -> None:
m_ctx.return_value = self._ctx(repo_dir="/tmp/repo", resolved={})
m_primary.return_value = "git@github.com:alice/repo.git"
m_probe_detail.return_value = (True, "")
repos = [
{
"provider": "github.com",
"account": "alice",
"repository": "repo",
"private": False,
"description": "desc",
}
]
setup_mirrors(
selected_repos=repos,
repositories_base_dir="/tmp",
all_repos=repos,
preview=True,
local=False,
remote=True,
ensure_remote=True,
)
m_ensure_url.assert_called_once_with(
url="git@github.com:alice/repo.git",
private_default=False,
description="desc",
preview=True,
)
m_probe_detail.assert_called_once_with(
"git@github.com:alice/repo.git", cwd="/tmp/repo"
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,177 @@
# tests/unit/pkgmgr/actions/mirror/test_visibility_cmd.py
from __future__ import annotations
import io
import unittest
from contextlib import redirect_stdout
from unittest.mock import patch, MagicMock
from pkgmgr.actions.mirror.visibility_cmd import set_mirror_visibility
class TestMirrorVisibilityCmd(unittest.TestCase):
def test_invalid_visibility_raises_value_error(self) -> None:
with self.assertRaises(ValueError):
set_mirror_visibility(
selected_repos=[{"id": "x"}],
repositories_base_dir="/tmp",
all_repos=[],
visibility="nope",
)
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
@patch("pkgmgr.actions.mirror.visibility_cmd.determine_primary_remote_url")
def test_no_git_mirrors_and_no_primary_prints_nothing_to_do(
self,
mock_determine_primary: MagicMock,
mock_build_ctx: MagicMock,
) -> None:
ctx = MagicMock()
ctx.identifier = "repo1"
ctx.repo_dir = "/tmp/repo1"
ctx.resolved_mirrors = {"pypi": "https://pypi.org/project/x/"} # non-git
mock_build_ctx.return_value = ctx
mock_determine_primary.return_value = None
buf = io.StringIO()
with redirect_stdout(buf):
set_mirror_visibility(
selected_repos=[{"id": "repo1", "description": "desc"}],
repositories_base_dir="/tmp",
all_repos=[],
visibility="public",
preview=True,
)
out = buf.getvalue()
self.assertIn("[MIRROR VISIBILITY] repo1", out)
self.assertIn("Nothing to do.", out)
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
@patch("pkgmgr.actions.mirror.visibility_cmd.determine_primary_remote_url")
@patch("pkgmgr.actions.mirror.visibility_cmd.normalize_provider_host")
@patch("pkgmgr.actions.mirror.visibility_cmd.parse_repo_from_git_url")
@patch("pkgmgr.actions.mirror.visibility_cmd.set_repo_visibility")
def test_applies_to_primary_when_no_git_mirrors(
self,
mock_set_repo_visibility: MagicMock,
mock_parse: MagicMock,
mock_norm: MagicMock,
mock_determine_primary: MagicMock,
mock_build_ctx: MagicMock,
) -> None:
ctx = MagicMock()
ctx.identifier = "repo1"
ctx.repo_dir = "/tmp/repo1"
ctx.resolved_mirrors = {} # no mirrors
mock_build_ctx.return_value = ctx
primary = "ssh://git.veen.world:2201/me/repo1.git"
mock_determine_primary.return_value = primary
mock_parse.return_value = ("git.veen.world:2201", "me", "repo1")
mock_norm.return_value = "git.veen.world:2201"
mock_set_repo_visibility.return_value = MagicMock(
status="skipped", message="Preview"
)
buf = io.StringIO()
with redirect_stdout(buf):
set_mirror_visibility(
selected_repos=[{"id": "repo1", "description": "desc"}],
repositories_base_dir="/tmp",
all_repos=[],
visibility="private",
preview=True,
)
mock_set_repo_visibility.assert_called_once()
_, kwargs = mock_set_repo_visibility.call_args
self.assertEqual(
kwargs["private"], True
) # visibility=private => desired_private=True
out = buf.getvalue()
self.assertIn("applying to primary", out)
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
@patch("pkgmgr.actions.mirror.visibility_cmd.normalize_provider_host")
@patch("pkgmgr.actions.mirror.visibility_cmd.parse_repo_from_git_url")
@patch("pkgmgr.actions.mirror.visibility_cmd.set_repo_visibility")
def test_applies_to_all_git_mirrors(
self,
mock_set_repo_visibility: MagicMock,
mock_parse: MagicMock,
mock_norm: MagicMock,
mock_build_ctx: MagicMock,
) -> None:
ctx = MagicMock()
ctx.identifier = "repo1"
ctx.repo_dir = "/tmp/repo1"
ctx.resolved_mirrors = {
"origin": "ssh://git.veen.world:2201/me/repo1.git",
"backup": "git@git.veen.world:me/repo1.git",
"notgit": "https://pypi.org/project/x/",
}
mock_build_ctx.return_value = ctx
# For both URLs, parsing returns same repo
mock_parse.return_value = ("git.veen.world", "me", "repo1")
mock_norm.return_value = "git.veen.world"
mock_set_repo_visibility.return_value = MagicMock(
status="noop", message="Already public"
)
buf = io.StringIO()
with redirect_stdout(buf):
set_mirror_visibility(
selected_repos=[{"id": "repo1", "description": "desc"}],
repositories_base_dir="/tmp",
all_repos=[],
visibility="public",
preview=False,
)
# Should be called for origin + backup (2), but not for notgit
self.assertEqual(mock_set_repo_visibility.call_count, 2)
# Each call should request desired private=False for "public"
for call in mock_set_repo_visibility.call_args_list:
_, kwargs = call
self.assertEqual(kwargs["private"], False)
out = buf.getvalue()
self.assertIn("applying to mirror 'origin'", out)
self.assertIn("applying to mirror 'backup'", out)
@patch("pkgmgr.actions.mirror.visibility_cmd.build_context")
@patch("pkgmgr.actions.mirror.visibility_cmd.determine_primary_remote_url")
def test_primary_not_git_prints_nothing_to_do(
self,
mock_determine_primary: MagicMock,
mock_build_ctx: MagicMock,
) -> None:
ctx = MagicMock()
ctx.identifier = "repo1"
ctx.repo_dir = "/tmp/repo1"
ctx.resolved_mirrors = {}
mock_build_ctx.return_value = ctx
mock_determine_primary.return_value = "https://example.com/not-a-git-url"
buf = io.StringIO()
with redirect_stdout(buf):
set_mirror_visibility(
selected_repos=[{"id": "repo1"}],
repositories_base_dir="/tmp",
all_repos=[],
visibility="public",
)
out = buf.getvalue()
self.assertIn("Nothing to do.", out)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,135 @@
from __future__ import annotations
import io
import os
import sys
import tempfile
import types
import unittest
from pathlib import Path
from unittest.mock import patch
from pkgmgr.cli.commands import config as config_cmd
class FindDefaultsSourceDirTests(unittest.TestCase):
def test_prefers_pkg_root_config_over_project_root_config(self):
with tempfile.TemporaryDirectory() as td:
root = Path(td)
pkg_root = root / "site-packages" / "pkgmgr"
pkg_root.mkdir(parents=True)
# both exist
(pkg_root / "config").mkdir(parents=True)
(pkg_root.parent / "config").mkdir(parents=True)
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
found = config_cmd._find_defaults_source_dir()
self.assertEqual(Path(found).resolve(), (pkg_root / "config").resolve())
def test_falls_back_to_project_root_config(self):
with tempfile.TemporaryDirectory() as td:
root = Path(td)
pkg_root = root / "site-packages" / "pkgmgr"
pkg_root.mkdir(parents=True)
# only project_root config exists
(pkg_root.parent / "config").mkdir(parents=True)
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
found = config_cmd._find_defaults_source_dir()
self.assertEqual(
Path(found).resolve(), (pkg_root.parent / "config").resolve()
)
def test_returns_none_when_no_config_dirs_exist(self):
with tempfile.TemporaryDirectory() as td:
root = Path(td)
pkg_root = root / "site-packages" / "pkgmgr"
pkg_root.mkdir(parents=True)
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
found = config_cmd._find_defaults_source_dir()
self.assertIsNone(found)
class UpdateDefaultConfigsTests(unittest.TestCase):
def test_copies_yaml_files_skips_config_yaml(self):
with tempfile.TemporaryDirectory() as td:
root = Path(td)
source_dir = root / "src"
source_dir.mkdir()
# Create files
(source_dir / "a.yaml").write_text("x: 1\n", encoding="utf-8")
(source_dir / "b.yml").write_text("y: 2\n", encoding="utf-8")
(source_dir / "config.yaml").write_text(
"should_not_copy: true\n", encoding="utf-8"
)
(source_dir / "notes.txt").write_text("nope\n", encoding="utf-8")
home = root / "home"
dest_cfg_dir = home / ".config" / "pkgmgr"
dest_cfg_dir.mkdir(parents=True)
user_config_path = str(dest_cfg_dir / "config.yaml")
# Patch the source dir finder to our temp source_dir
with patch.object(
config_cmd, "_find_defaults_source_dir", return_value=str(source_dir)
):
with patch.dict(os.environ, {"HOME": str(home)}):
config_cmd._update_default_configs(user_config_path)
self.assertTrue((dest_cfg_dir / "a.yaml").is_file())
self.assertTrue((dest_cfg_dir / "b.yml").is_file())
self.assertFalse(
(dest_cfg_dir / "config.yaml")
.read_text(encoding="utf-8")
.startswith("should_not_copy")
)
# Ensure config.yaml was not overwritten (it may exist, but should remain original if we create it)
# We'll strengthen: create an original config.yaml then re-run
(dest_cfg_dir / "config.yaml").write_text(
"original: true\n", encoding="utf-8"
)
with patch.object(
config_cmd, "_find_defaults_source_dir", return_value=str(source_dir)
):
with patch.dict(os.environ, {"HOME": str(home)}):
config_cmd._update_default_configs(user_config_path)
self.assertEqual(
(dest_cfg_dir / "config.yaml").read_text(encoding="utf-8"),
"original: true\n",
)
def test_prints_warning_and_returns_when_no_source_dir(self):
with tempfile.TemporaryDirectory() as td:
root = Path(td)
home = root / "home"
dest_cfg_dir = home / ".config" / "pkgmgr"
dest_cfg_dir.mkdir(parents=True)
user_config_path = str(dest_cfg_dir / "config.yaml")
buf = io.StringIO()
with patch.object(
config_cmd, "_find_defaults_source_dir", return_value=None
):
with patch("sys.stdout", buf):
with patch.dict(os.environ, {"HOME": str(home)}):
config_cmd._update_default_configs(user_config_path)
out = buf.getvalue()
self.assertIn("[WARN] No config directory found", out)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,271 @@
from __future__ import annotations
import os
import sys
import tempfile
import types
import unittest
from pathlib import Path
from unittest.mock import patch
import yaml
from pkgmgr.core.config.load import (
_deep_merge,
_merge_repo_lists,
_load_layer_dir,
_load_defaults_from_package_or_project,
load_config,
)
class DeepMergeTests(unittest.TestCase):
def test_deep_merge_overrides_scalars_and_merges_dicts(self):
base = {"a": 1, "b": {"x": 1, "y": 2}, "c": {"k": 1}}
override = {"a": 2, "b": {"y": 99, "z": 3}, "c": 7}
merged = _deep_merge(base, override)
self.assertEqual(merged["a"], 2)
self.assertEqual(merged["b"]["x"], 1)
self.assertEqual(merged["b"]["y"], 99)
self.assertEqual(merged["b"]["z"], 3)
self.assertEqual(merged["c"], 7)
class MergeRepoListsTests(unittest.TestCase):
def test_merge_repo_lists_adds_new_repo_and_tracks_category(self):
base = []
new = [{"provider": "github", "account": "a", "repository": "r", "x": 1}]
_merge_repo_lists(base, new, category_name="cat1")
self.assertEqual(len(base), 1)
self.assertEqual(base[0]["provider"], "github")
self.assertEqual(base[0]["x"], 1)
self.assertIn("category_files", base[0])
self.assertIn("cat1", base[0]["category_files"])
def test_merge_repo_lists_merges_existing_repo_fields(self):
base = [
{
"provider": "github",
"account": "a",
"repository": "r",
"x": 1,
"d": {"a": 1},
}
]
new = [
{
"provider": "github",
"account": "a",
"repository": "r",
"x": 2,
"d": {"b": 2},
}
]
_merge_repo_lists(base, new, category_name="cat2")
self.assertEqual(len(base), 1)
self.assertEqual(base[0]["x"], 2)
self.assertEqual(base[0]["d"]["a"], 1)
self.assertEqual(base[0]["d"]["b"], 2)
self.assertIn("cat2", base[0]["category_files"])
def test_merge_repo_lists_incomplete_key_appends(self):
base = []
new = [{"foo": "bar"}] # no provider/account/repository
_merge_repo_lists(base, new, category_name="cat")
self.assertEqual(len(base), 1)
self.assertEqual(base[0]["foo"], "bar")
self.assertIn("cat", base[0].get("category_files", []))
class LoadLayerDirTests(unittest.TestCase):
def test_load_layer_dir_merges_directories_and_repos_across_files_sorted(self):
with tempfile.TemporaryDirectory() as td:
cfg_dir = Path(td)
# 10_b.yaml should be applied after 01_a.yaml due to name sorting
(cfg_dir / "01_a.yaml").write_text(
yaml.safe_dump(
{
"directories": {"repositories": "/opt/Repos"},
"repositories": [
{
"provider": "github",
"account": "a",
"repository": "r1",
"x": 1,
}
],
}
),
encoding="utf-8",
)
(cfg_dir / "10_b.yaml").write_text(
yaml.safe_dump(
{
"directories": {"binaries": "/usr/local/bin"},
"repositories": [
{
"provider": "github",
"account": "a",
"repository": "r1",
"x": 2,
},
{"provider": "github", "account": "a", "repository": "r2"},
],
}
),
encoding="utf-8",
)
defaults = _load_layer_dir(cfg_dir, skip_filename="config.yaml")
self.assertEqual(defaults["directories"]["repositories"], "/opt/Repos")
self.assertEqual(defaults["directories"]["binaries"], "/usr/local/bin")
# r1 merged: x becomes 2 and has category_files including both stems
repos = defaults["repositories"]
self.assertEqual(len(repos), 2)
r1 = next(r for r in repos if r["repository"] == "r1")
self.assertEqual(r1["x"], 2)
self.assertIn("01_a", r1.get("category_files", []))
self.assertIn("10_b", r1.get("category_files", []))
def test_load_layer_dir_skips_config_yaml(self):
with tempfile.TemporaryDirectory() as td:
cfg_dir = Path(td)
(cfg_dir / "config.yaml").write_text(
yaml.safe_dump({"directories": {"x": 1}}), encoding="utf-8"
)
(cfg_dir / "defaults.yaml").write_text(
yaml.safe_dump({"directories": {"x": 2}}), encoding="utf-8"
)
defaults = _load_layer_dir(cfg_dir, skip_filename="config.yaml")
# only defaults.yaml should apply
self.assertEqual(defaults["directories"]["x"], 2)
class DefaultsFromPackageOrProjectTests(unittest.TestCase):
def test_defaults_from_pkg_root_config_wins(self):
with tempfile.TemporaryDirectory() as td:
root = Path(td)
pkg_root = root / "site-packages" / "pkgmgr"
cfg_dir = pkg_root / "config"
cfg_dir.mkdir(parents=True)
(cfg_dir / "defaults.yaml").write_text(
yaml.safe_dump(
{"directories": {"repositories": "/opt/Repos"}, "repositories": []}
),
encoding="utf-8",
)
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
defaults = _load_defaults_from_package_or_project()
self.assertEqual(defaults["directories"]["repositories"], "/opt/Repos")
def test_defaults_from_repo_root_src_layout(self):
with tempfile.TemporaryDirectory() as td:
repo_root = Path(td) / "repo"
pkg_root = repo_root / "src" / "pkgmgr"
cfg_dir = repo_root / "config"
cfg_dir.mkdir(parents=True)
pkg_root.mkdir(parents=True)
(cfg_dir / "defaults.yaml").write_text(
yaml.safe_dump(
{"directories": {"binaries": "/usr/local/bin"}, "repositories": []}
),
encoding="utf-8",
)
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
defaults = _load_defaults_from_package_or_project()
self.assertEqual(defaults["directories"]["binaries"], "/usr/local/bin")
def test_defaults_returns_empty_when_no_config_found(self):
with tempfile.TemporaryDirectory() as td:
pkg_root = Path(td) / "site-packages" / "pkgmgr"
pkg_root.mkdir(parents=True)
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
defaults = _load_defaults_from_package_or_project()
self.assertEqual(defaults, {"directories": {}, "repositories": []})
class LoadConfigIntegrationUnitTests(unittest.TestCase):
def test_load_config_prefers_user_dir_defaults_over_package_defaults(self):
with tempfile.TemporaryDirectory() as td:
home = Path(td) / "home"
user_cfg_dir = home / ".config" / "pkgmgr"
user_cfg_dir.mkdir(parents=True)
user_config_path = str(user_cfg_dir / "config.yaml")
# user dir defaults exist -> should be used, package fallback must not matter
(user_cfg_dir / "aa.yaml").write_text(
yaml.safe_dump({"directories": {"repositories": "/USER/Repos"}}),
encoding="utf-8",
)
(user_cfg_dir / "config.yaml").write_text(
yaml.safe_dump({"directories": {"binaries": "/USER/bin"}}),
encoding="utf-8",
)
with patch.dict(os.environ, {"HOME": str(home)}):
merged = load_config(user_config_path)
self.assertEqual(merged["directories"]["repositories"], "/USER/Repos")
self.assertEqual(merged["directories"]["binaries"], "/USER/bin")
def test_load_config_falls_back_to_package_when_user_dir_has_no_defaults(self):
with tempfile.TemporaryDirectory() as td:
home = Path(td) / "home"
user_cfg_dir = home / ".config" / "pkgmgr"
user_cfg_dir.mkdir(parents=True)
user_config_path = str(user_cfg_dir / "config.yaml")
# Only user config exists, no other yaml defaults
(user_cfg_dir / "config.yaml").write_text(
yaml.safe_dump({"directories": {"x": 1}}), encoding="utf-8"
)
# Provide package defaults via fake pkgmgr + pkg_root/config
root = Path(td) / "site-packages"
pkg_root = root / "pkgmgr"
cfg_dir = (
root / "config"
) # NOTE: load.py checks multiple roots, including pkg_root.parent (=site-packages)
pkg_root.mkdir(parents=True)
cfg_dir.mkdir(parents=True)
(cfg_dir / "defaults.yaml").write_text(
yaml.safe_dump(
{"directories": {"repositories": "/PKG/Repos"}, "repositories": []}
),
encoding="utf-8",
)
fake_pkgmgr = types.SimpleNamespace(__file__=str(pkg_root / "__init__.py"))
with patch.dict(sys.modules, {"pkgmgr": fake_pkgmgr}):
with patch.dict(os.environ, {"HOME": str(home)}):
merged = load_config(user_config_path)
# directories are merged: defaults then user
self.assertEqual(merged["directories"]["repositories"], "/PKG/Repos")
self.assertEqual(merged["directories"]["x"], 1)
self.assertIn("repositories", merged)
self.assertIsInstance(merged["repositories"], list)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,145 @@
from __future__ import annotations
import importlib
import unittest
from unittest.mock import patch
from pkgmgr.core.git.errors import GitRunError
# IMPORTANT:
# Import the MODULE, not the function exported by pkgmgr.core.git.queries.__init__.
pr = importlib.import_module("pkgmgr.core.git.queries.probe_remote_reachable")
def _git_error(
*,
returncode: int,
stderr: str = "",
stdout: str = "",
message: str = "git failed",
) -> GitRunError:
"""
Create a GitRunError that mimics what pkgmgr.core.git.run attaches.
"""
exc = GitRunError(message)
exc.returncode = returncode
exc.stderr = stderr
exc.stdout = stdout
return exc
class TestProbeRemoteReachableHelpers(unittest.TestCase):
def test_first_useful_line_prefers_keyword_lines(self) -> None:
text = "\nerror:\n \nFATAL: Could not read from remote repository.\nmore\n"
self.assertEqual(
pr._first_useful_line(text),
"FATAL: Could not read from remote repository.",
)
def test_first_useful_line_skips_plain_error_if_possible(self) -> None:
text = "error:\nsome other info\n"
self.assertEqual(pr._first_useful_line(text), "some other info")
def test_first_useful_line_returns_empty_for_empty(self) -> None:
self.assertEqual(pr._first_useful_line(" \n\n"), "")
def test_looks_like_real_transport_error_true(self) -> None:
self.assertTrue(
pr._looks_like_real_transport_error(
"fatal: Could not read from remote repository."
)
)
def test_looks_like_real_transport_error_false(self) -> None:
self.assertFalse(pr._looks_like_real_transport_error("some harmless output"))
class TestProbeRemoteReachableDetail(unittest.TestCase):
@patch.object(pr, "run", return_value="")
def test_detail_success_returns_true_empty_reason(self, m_run) -> None:
ok, reason = pr.probe_remote_reachable_detail(
"git@github.com:alice/repo.git",
cwd="/tmp",
)
self.assertTrue(ok)
self.assertEqual(reason, "")
m_run.assert_called_once()
@patch.object(pr, "run")
def test_detail_rc2_without_transport_indicators_treated_as_reachable(
self, m_run
) -> None:
# rc=2 but no transport/auth indicators => treat as reachable (empty repo)
m_run.side_effect = _git_error(
returncode=2,
stderr="",
stdout="",
message="Git command failed (exit 2)",
)
ok, reason = pr.probe_remote_reachable_detail(
"git@github.com:alice/empty.git",
cwd="/tmp",
)
self.assertTrue(ok)
self.assertIn("empty repository", reason.lower())
@patch.object(pr, "run")
def test_detail_rc2_with_transport_indicators_is_not_reachable(self, m_run) -> None:
# rc=2 but stderr indicates transport/auth problem => NOT reachable
m_run.side_effect = _git_error(
returncode=2,
stderr="ERROR: Repository not found.",
stdout="",
message="Git command failed (exit 2)",
)
ok, reason = pr.probe_remote_reachable_detail(
"git@github.com:alice/missing.git",
cwd="/tmp",
)
self.assertFalse(ok)
self.assertIn("repository not found", reason.lower())
@patch.object(pr, "run")
def test_detail_rc128_reports_reason(self, m_run) -> None:
m_run.side_effect = _git_error(
returncode=128,
stderr="fatal: Could not read from remote repository.",
stdout="",
message="Git command failed (exit 128)",
)
ok, reason = pr.probe_remote_reachable_detail(
"ssh://git@host:2201/a/b.git",
cwd="/tmp",
)
self.assertFalse(ok)
self.assertIn("(exit 128)", reason.lower())
self.assertIn("could not read from remote repository", reason.lower())
@patch.object(pr, "run")
def test_detail_adds_hint_if_reason_is_generic(self, m_run) -> None:
# Generic failure: rc=128 but no stderr/stdout => should append hint
m_run.side_effect = _git_error(
returncode=128,
stderr="",
stdout="",
message="",
)
url = "git@github.com:alice/repo.git"
ok, reason = pr.probe_remote_reachable_detail(url, cwd="/tmp")
self.assertFalse(ok)
self.assertIn("hint:", reason.lower())
self.assertIn("git ls-remote --exit-code", reason.lower())
@patch.object(pr, "probe_remote_reachable_detail", return_value=(True, ""))
def test_probe_remote_reachable_delegates_to_detail(self, m_detail) -> None:
self.assertTrue(pr.probe_remote_reachable("x", cwd="/tmp"))
m_detail.assert_called_once_with("x", cwd="/tmp")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,227 @@
# tests/unit/pkgmgr/core/remote_provisioning/test_visibility.py
from __future__ import annotations
import unittest
from unittest.mock import MagicMock
from pkgmgr.core.remote_provisioning.types import (
AuthError,
NetworkError,
PermissionError,
ProviderHint,
RepoSpec,
UnsupportedProviderError,
)
from pkgmgr.core.remote_provisioning.visibility import (
VisibilityOptions,
set_repo_visibility,
)
from pkgmgr.core.remote_provisioning.http.errors import HttpError
class TestSetRepoVisibility(unittest.TestCase):
def _mk_provider(self, *, kind: str = "gitea") -> MagicMock:
p = MagicMock()
p.kind = kind
return p
def _mk_registry(
self, provider: MagicMock | None, providers: list[MagicMock] | None = None
) -> MagicMock:
reg = MagicMock()
reg.resolve.return_value = provider
reg.providers = (
providers
if providers is not None
else ([provider] if provider is not None else [])
)
return reg
def _mk_token_resolver(self, token: str = "TOKEN") -> MagicMock:
resolver = MagicMock()
tok = MagicMock()
tok.token = token
resolver.get_token.return_value = tok
return resolver
def test_preview_returns_skipped_and_does_not_call_provider(self) -> None:
provider = self._mk_provider()
reg = self._mk_registry(provider)
resolver = self._mk_token_resolver()
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
res = set_repo_visibility(
spec,
private=False,
options=VisibilityOptions(preview=True),
registry=reg,
token_resolver=resolver,
)
self.assertEqual(res.status, "skipped")
provider.get_repo_private.assert_not_called()
provider.set_repo_private.assert_not_called()
def test_unsupported_provider_raises(self) -> None:
reg = self._mk_registry(provider=None, providers=[])
spec = RepoSpec(host="unknown.host", owner="me", name="repo", private=True)
with self.assertRaises(UnsupportedProviderError):
set_repo_visibility(
spec,
private=True,
registry=reg,
token_resolver=self._mk_token_resolver(),
)
def test_notfound_when_provider_returns_none(self) -> None:
provider = self._mk_provider()
provider.get_repo_private.return_value = None
reg = self._mk_registry(provider)
resolver = self._mk_token_resolver()
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
res = set_repo_visibility(
spec,
private=True,
registry=reg,
token_resolver=resolver,
)
self.assertEqual(res.status, "notfound")
provider.set_repo_private.assert_not_called()
def test_noop_when_already_desired(self) -> None:
provider = self._mk_provider()
provider.get_repo_private.return_value = True
reg = self._mk_registry(provider)
resolver = self._mk_token_resolver()
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
res = set_repo_visibility(
spec,
private=True,
registry=reg,
token_resolver=resolver,
)
self.assertEqual(res.status, "noop")
provider.set_repo_private.assert_not_called()
def test_updated_when_needs_change(self) -> None:
provider = self._mk_provider()
provider.get_repo_private.return_value = True
reg = self._mk_registry(provider)
resolver = self._mk_token_resolver()
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
res = set_repo_visibility(
spec,
private=False,
registry=reg,
token_resolver=resolver,
)
self.assertEqual(res.status, "updated")
provider.set_repo_private.assert_called_once()
args, kwargs = provider.set_repo_private.call_args
self.assertEqual(kwargs.get("private"), False)
def test_provider_hint_overrides_registry_resolution(self) -> None:
# registry.resolve returns gitea provider, but hint forces github provider
gitea = self._mk_provider(kind="gitea")
github = self._mk_provider(kind="github")
github.get_repo_private.return_value = True
reg = self._mk_registry(gitea, providers=[gitea, github])
resolver = self._mk_token_resolver()
spec = RepoSpec(host="github.com", owner="me", name="repo", private=True)
res = set_repo_visibility(
spec,
private=False,
provider_hint=ProviderHint(kind="github"),
registry=reg,
token_resolver=resolver,
)
self.assertEqual(res.status, "updated")
github.get_repo_private.assert_called_once()
gitea.get_repo_private.assert_not_called()
def test_http_error_401_maps_to_auth_error(self) -> None:
provider = self._mk_provider()
provider.get_repo_private.side_effect = HttpError(
status=401, message="nope", body=""
)
reg = self._mk_registry(provider)
resolver = self._mk_token_resolver()
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
with self.assertRaises(AuthError):
set_repo_visibility(
spec, private=True, registry=reg, token_resolver=resolver
)
def test_http_error_403_maps_to_permission_error(self) -> None:
provider = self._mk_provider()
provider.get_repo_private.side_effect = HttpError(
status=403, message="nope", body=""
)
reg = self._mk_registry(provider)
resolver = self._mk_token_resolver()
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
with self.assertRaises(PermissionError):
set_repo_visibility(
spec, private=True, registry=reg, token_resolver=resolver
)
def test_http_error_status_0_maps_to_network_error(self) -> None:
provider = self._mk_provider()
provider.get_repo_private.side_effect = HttpError(
status=0, message="connection failed", body=""
)
reg = self._mk_registry(provider)
resolver = self._mk_token_resolver()
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
with self.assertRaises(NetworkError):
set_repo_visibility(
spec, private=True, registry=reg, token_resolver=resolver
)
def test_http_error_other_maps_to_network_error(self) -> None:
provider = self._mk_provider()
provider.get_repo_private.side_effect = HttpError(
status=500, message="boom", body="server error"
)
reg = self._mk_registry(provider)
resolver = self._mk_token_resolver()
spec = RepoSpec(host="git.veen.world", owner="me", name="repo", private=True)
with self.assertRaises(NetworkError):
set_repo_visibility(
spec, private=True, registry=reg, token_resolver=resolver
)
if __name__ == "__main__":
unittest.main()