Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9485bc9e3f | ||
|
|
dcda23435d | ||
|
|
a69e81c44b | ||
|
|
2ca004d056 | ||
|
|
f7bd5bfd0b | ||
|
|
2c15a4016b | ||
|
|
9e3ce34626 | ||
|
|
1a13fcaa4e | ||
|
|
48a0d1d458 | ||
|
|
783d2b921a |
8
.github/workflows/ci.yml
vendored
8
.github/workflows/ci.yml
vendored
@@ -28,8 +28,8 @@ jobs:
|
||||
test-virgin-root:
|
||||
uses: ./.github/workflows/test-virgin-root.yml
|
||||
|
||||
linter-shell:
|
||||
uses: ./.github/workflows/linter-shell.yml
|
||||
lint-shell:
|
||||
uses: ./.github/workflows/lint-shell.yml
|
||||
|
||||
linter-python:
|
||||
uses: ./.github/workflows/linter-python.yml
|
||||
lint-python:
|
||||
uses: ./.github/workflows/lint-python.yml
|
||||
|
||||
@@ -4,7 +4,7 @@ on:
|
||||
workflow_call:
|
||||
|
||||
jobs:
|
||||
linter-python:
|
||||
lint-python:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
@@ -4,7 +4,7 @@ on:
|
||||
workflow_call:
|
||||
|
||||
jobs:
|
||||
linter-shell:
|
||||
lint-shell:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
12
.github/workflows/mark-stable.yml
vendored
12
.github/workflows/mark-stable.yml
vendored
@@ -29,16 +29,16 @@ jobs:
|
||||
test-virgin-root:
|
||||
uses: ./.github/workflows/test-virgin-root.yml
|
||||
|
||||
linter-shell:
|
||||
uses: ./.github/workflows/linter-shell.yml
|
||||
lint-shell:
|
||||
uses: ./.github/workflows/lint-shell.yml
|
||||
|
||||
linter-python:
|
||||
uses: ./.github/workflows/linter-python.yml
|
||||
lint-python:
|
||||
uses: ./.github/workflows/lint-python.yml
|
||||
|
||||
mark-stable:
|
||||
needs:
|
||||
- linter-shell
|
||||
- linter-python
|
||||
- lint-shell
|
||||
- lint-python
|
||||
- test-unit
|
||||
- test-integration
|
||||
- test-env-nix
|
||||
|
||||
20
CHANGELOG.md
20
CHANGELOG.md
@@ -1,3 +1,23 @@
|
||||
## [1.8.0] - 2025-12-15
|
||||
|
||||
* *** New Features: ***
|
||||
- **Silent Updates**: You can now use the `--silent` flag during installs and updates to suppress error messages for individual repositories and get a single summary at the end. This ensures the process continues even if some repositories fail, while still preserving interactive checks when not in silent mode.
|
||||
- **Repository Scaffolding**: The process for creating new repositories has been improved. You can now use templates to scaffold repositories with a preview and automatic mirror setup.
|
||||
|
||||
*** Bug Fixes: ***
|
||||
- **Pip Installation**: Pip is now installed automatically on all supported systems. This includes `python-pip` for Arch and `python3-pip` for CentOS, Debian, Fedora, and Ubuntu, ensuring that pip is available for Python package installations.
|
||||
- **Pacman Keyring**: Fixed an issue on Arch Linux where package installation would fail due to missing keys. The pacman keyring is now properly initialized before installing packages.
|
||||
|
||||
|
||||
## [1.7.2] - 2025-12-15
|
||||
|
||||
* * Git mirrors are now resolved consistently (origin → MIRRORS file → config → default).
|
||||
* The `origin` remote is always enforced to use the primary URL for both fetch and push.
|
||||
* Additional mirrors are added as extra push targets without duplication.
|
||||
* Local and remote mirror setup behaves more predictably and consistently.
|
||||
* Improved test coverage ensures stable origin and push URL handling.
|
||||
|
||||
|
||||
## [1.7.1] - 2025-12-14
|
||||
|
||||
* Patched package-manager to kpmx to publish on pypi
|
||||
|
||||
@@ -32,7 +32,7 @@
|
||||
rec {
|
||||
pkgmgr = pyPkgs.buildPythonApplication {
|
||||
pname = "package-manager";
|
||||
version = "1.7.1";
|
||||
version = "1.8.0";
|
||||
|
||||
# Use the git repo as source
|
||||
src = ./.;
|
||||
@@ -49,6 +49,7 @@
|
||||
# Runtime dependencies (matches [project.dependencies] in pyproject.toml)
|
||||
propagatedBuildInputs = [
|
||||
pyPkgs.pyyaml
|
||||
pyPkgs.jinja2
|
||||
pyPkgs.pip
|
||||
];
|
||||
|
||||
@@ -78,6 +79,7 @@
|
||||
pythonWithDeps = python.withPackages (ps: [
|
||||
ps.pip
|
||||
ps.pyyaml
|
||||
ps.jinja2
|
||||
]);
|
||||
in
|
||||
{
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Maintainer: Kevin Veen-Birkenbach <info@veen.world>
|
||||
|
||||
pkgname=package-manager
|
||||
pkgver=1.7.1
|
||||
pkgver=1.8.0
|
||||
pkgrel=1
|
||||
pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)."
|
||||
arch=('any')
|
||||
|
||||
@@ -1,3 +1,25 @@
|
||||
package-manager (1.8.0-1) unstable; urgency=medium
|
||||
|
||||
* *** New Features: ***
|
||||
- **Silent Updates**: You can now use the `--silent` flag during installs and updates to suppress error messages for individual repositories and get a single summary at the end. This ensures the process continues even if some repositories fail, while still preserving interactive checks when not in silent mode.
|
||||
- **Repository Scaffolding**: The process for creating new repositories has been improved. You can now use templates to scaffold repositories with a preview and automatic mirror setup.
|
||||
|
||||
*** Bug Fixes: ***
|
||||
- **Pip Installation**: Pip is now installed automatically on all supported systems. This includes `python-pip` for Arch and `python3-pip` for CentOS, Debian, Fedora, and Ubuntu, ensuring that pip is available for Python package installations.
|
||||
- **Pacman Keyring**: Fixed an issue on Arch Linux where package installation would fail due to missing keys. The pacman keyring is now properly initialized before installing packages.
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Mon, 15 Dec 2025 13:37:42 +0100
|
||||
|
||||
package-manager (1.7.2-1) unstable; urgency=medium
|
||||
|
||||
* * Git mirrors are now resolved consistently (origin → MIRRORS file → config → default).
|
||||
* The `origin` remote is always enforced to use the primary URL for both fetch and push.
|
||||
* Additional mirrors are added as extra push targets without duplication.
|
||||
* Local and remote mirror setup behaves more predictably and consistently.
|
||||
* Improved test coverage ensures stable origin and push URL handling.
|
||||
|
||||
-- Kevin Veen-Birkenbach <kevin@veen.world> Mon, 15 Dec 2025 00:53:26 +0100
|
||||
|
||||
package-manager (1.7.1-1) unstable; urgency=medium
|
||||
|
||||
* Patched package-manager to kpmx to publish on pypi
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
Name: package-manager
|
||||
Version: 1.7.1
|
||||
Version: 1.8.0
|
||||
Release: 1%{?dist}
|
||||
Summary: Wrapper that runs Kevin's package-manager via Nix flake
|
||||
|
||||
@@ -74,6 +74,22 @@ echo ">>> package-manager removed. Nix itself was not removed."
|
||||
/usr/lib/package-manager/
|
||||
|
||||
%changelog
|
||||
* Mon Dec 15 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.8.0-1
|
||||
- *** New Features: ***
|
||||
- **Silent Updates**: You can now use the `--silent` flag during installs and updates to suppress error messages for individual repositories and get a single summary at the end. This ensures the process continues even if some repositories fail, while still preserving interactive checks when not in silent mode.
|
||||
- **Repository Scaffolding**: The process for creating new repositories has been improved. You can now use templates to scaffold repositories with a preview and automatic mirror setup.
|
||||
|
||||
*** Bug Fixes: ***
|
||||
- **Pip Installation**: Pip is now installed automatically on all supported systems. This includes `python-pip` for Arch and `python3-pip` for CentOS, Debian, Fedora, and Ubuntu, ensuring that pip is available for Python package installations.
|
||||
- **Pacman Keyring**: Fixed an issue on Arch Linux where package installation would fail due to missing keys. The pacman keyring is now properly initialized before installing packages.
|
||||
|
||||
* Mon Dec 15 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.7.2-1
|
||||
- * Git mirrors are now resolved consistently (origin → MIRRORS file → config → default).
|
||||
* The `origin` remote is always enforced to use the primary URL for both fetch and push.
|
||||
* Additional mirrors are added as extra push targets without duplication.
|
||||
* Local and remote mirror setup behaves more predictably and consistently.
|
||||
* Improved test coverage ensures stable origin and push URL handling.
|
||||
|
||||
* Sun Dec 14 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.7.1-1
|
||||
- Patched package-manager to kpmx to publish on pypi
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "kpmx"
|
||||
version = "1.7.1"
|
||||
version = "1.8.0"
|
||||
description = "Kevin's package-manager tool (pkgmgr)"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.9"
|
||||
@@ -21,6 +21,7 @@ authors = [
|
||||
dependencies = [
|
||||
"PyYAML>=6.0",
|
||||
"tomli; python_version < \"3.11\"",
|
||||
"jinja2>=3.1"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
@@ -6,6 +6,13 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
echo "[arch/dependencies] Installing Arch build dependencies..."
|
||||
|
||||
pacman -Syu --noconfirm
|
||||
|
||||
if ! pacman-key --list-sigs &>/dev/null; then
|
||||
echo "[arch/dependencies] Initializing pacman keyring..."
|
||||
pacman-key --init
|
||||
pacman-key --populate archlinux
|
||||
fi
|
||||
|
||||
pacman -S --noconfirm --needed \
|
||||
base-devel \
|
||||
git \
|
||||
@@ -13,6 +20,7 @@ pacman -S --noconfirm --needed \
|
||||
curl \
|
||||
ca-certificates \
|
||||
python \
|
||||
python-pip \
|
||||
xz
|
||||
|
||||
pacman -Scc --noconfirm
|
||||
|
||||
@@ -14,6 +14,7 @@ dnf -y install \
|
||||
curl-minimal \
|
||||
ca-certificates \
|
||||
python3 \
|
||||
python3-pip \
|
||||
sudo \
|
||||
xz
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
|
||||
ca-certificates \
|
||||
python3 \
|
||||
python3-venv \
|
||||
python3-pip \
|
||||
xz-utils
|
||||
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
@@ -14,6 +14,7 @@ dnf -y install \
|
||||
curl \
|
||||
ca-certificates \
|
||||
python3 \
|
||||
python3-pip \
|
||||
xz
|
||||
|
||||
dnf clean all
|
||||
|
||||
@@ -17,6 +17,7 @@ DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
|
||||
make \
|
||||
python3 \
|
||||
python3-venv \
|
||||
python3-pip \
|
||||
ca-certificates \
|
||||
xz-utils
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ Responsibilities:
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from pkgmgr.core.repository.identifier import get_repo_identifier
|
||||
from pkgmgr.core.repository.dir import get_repo_dir
|
||||
@@ -93,6 +93,7 @@ def _verify_repo(
|
||||
repo_dir: str,
|
||||
no_verification: bool,
|
||||
identifier: str,
|
||||
silent: bool,
|
||||
) -> bool:
|
||||
"""
|
||||
Verify a repository using the configured verification data.
|
||||
@@ -111,10 +112,15 @@ def _verify_repo(
|
||||
print(f"Warning: Verification failed for {identifier}:")
|
||||
for err in errors:
|
||||
print(f" - {err}")
|
||||
choice = input("Continue anyway? [y/N]: ").strip().lower()
|
||||
if choice != "y":
|
||||
print(f"Skipping installation for {identifier}.")
|
||||
return False
|
||||
|
||||
if silent:
|
||||
# Non-interactive mode: continue with a warning.
|
||||
print(f"[Warning] Continuing despite verification failure for {identifier} (--silent).")
|
||||
else:
|
||||
choice = input("Continue anyway? [y/N]: ").strip().lower()
|
||||
if choice != "y":
|
||||
print(f"Skipping installation for {identifier}.")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
@@ -163,6 +169,8 @@ def install_repos(
|
||||
clone_mode: str,
|
||||
update_dependencies: bool,
|
||||
force_update: bool = False,
|
||||
silent: bool = False,
|
||||
emit_summary: bool = True,
|
||||
) -> None:
|
||||
"""
|
||||
Install one or more repositories according to the configured installers
|
||||
@@ -170,45 +178,72 @@ def install_repos(
|
||||
|
||||
If force_update=True, installers of the currently active layer are allowed
|
||||
to run again (upgrade/refresh), even if that layer is already loaded.
|
||||
|
||||
If silent=True, repository failures are downgraded to warnings and the
|
||||
overall command never exits non-zero because of per-repository failures.
|
||||
"""
|
||||
pipeline = InstallationPipeline(INSTALLERS)
|
||||
failures: List[Tuple[str, str]] = []
|
||||
|
||||
for repo in selected_repos:
|
||||
identifier = get_repo_identifier(repo, all_repos)
|
||||
|
||||
repo_dir = _ensure_repo_dir(
|
||||
repo=repo,
|
||||
repositories_base_dir=repositories_base_dir,
|
||||
all_repos=all_repos,
|
||||
preview=preview,
|
||||
no_verification=no_verification,
|
||||
clone_mode=clone_mode,
|
||||
identifier=identifier,
|
||||
)
|
||||
if not repo_dir:
|
||||
try:
|
||||
repo_dir = _ensure_repo_dir(
|
||||
repo=repo,
|
||||
repositories_base_dir=repositories_base_dir,
|
||||
all_repos=all_repos,
|
||||
preview=preview,
|
||||
no_verification=no_verification,
|
||||
clone_mode=clone_mode,
|
||||
identifier=identifier,
|
||||
)
|
||||
if not repo_dir:
|
||||
failures.append((identifier, "clone/ensure repo directory failed"))
|
||||
continue
|
||||
|
||||
if not _verify_repo(
|
||||
repo=repo,
|
||||
repo_dir=repo_dir,
|
||||
no_verification=no_verification,
|
||||
identifier=identifier,
|
||||
silent=silent,
|
||||
):
|
||||
continue
|
||||
|
||||
ctx = _create_context(
|
||||
repo=repo,
|
||||
identifier=identifier,
|
||||
repo_dir=repo_dir,
|
||||
repositories_base_dir=repositories_base_dir,
|
||||
bin_dir=bin_dir,
|
||||
all_repos=all_repos,
|
||||
no_verification=no_verification,
|
||||
preview=preview,
|
||||
quiet=quiet,
|
||||
clone_mode=clone_mode,
|
||||
update_dependencies=update_dependencies,
|
||||
force_update=force_update,
|
||||
)
|
||||
|
||||
pipeline.run(ctx)
|
||||
|
||||
except SystemExit as exc:
|
||||
code = exc.code if isinstance(exc.code, int) else str(exc.code)
|
||||
failures.append((identifier, f"installer failed (exit={code})"))
|
||||
if not quiet:
|
||||
print(f"[Warning] install: repository {identifier} failed (exit={code}). Continuing...")
|
||||
continue
|
||||
except Exception as exc:
|
||||
failures.append((identifier, f"unexpected error: {exc}"))
|
||||
if not quiet:
|
||||
print(f"[Warning] install: repository {identifier} hit an unexpected error: {exc}. Continuing...")
|
||||
continue
|
||||
|
||||
if not _verify_repo(
|
||||
repo=repo,
|
||||
repo_dir=repo_dir,
|
||||
no_verification=no_verification,
|
||||
identifier=identifier,
|
||||
):
|
||||
continue
|
||||
if failures and emit_summary and not quiet:
|
||||
print("\n[pkgmgr] Installation finished with warnings:")
|
||||
for ident, msg in failures:
|
||||
print(f" - {ident}: {msg}")
|
||||
|
||||
ctx = _create_context(
|
||||
repo=repo,
|
||||
identifier=identifier,
|
||||
repo_dir=repo_dir,
|
||||
repositories_base_dir=repositories_base_dir,
|
||||
bin_dir=bin_dir,
|
||||
all_repos=all_repos,
|
||||
no_verification=no_verification,
|
||||
preview=preview,
|
||||
quiet=quiet,
|
||||
clone_mode=clone_mode,
|
||||
update_dependencies=update_dependencies,
|
||||
force_update=force_update,
|
||||
)
|
||||
|
||||
pipeline.run(ctx)
|
||||
if failures and not silent:
|
||||
raise SystemExit(1)
|
||||
|
||||
@@ -1,20 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import List, Optional, Set
|
||||
|
||||
from pkgmgr.core.command.run import run_command
|
||||
from pkgmgr.core.git import GitError, run_git
|
||||
from typing import List, Optional, Set
|
||||
|
||||
from .types import MirrorMap, RepoMirrorContext, Repository
|
||||
|
||||
|
||||
def build_default_ssh_url(repo: Repository) -> Optional[str]:
|
||||
"""
|
||||
Build a simple SSH URL from repo config if no explicit mirror is defined.
|
||||
|
||||
Example: git@github.com:account/repository.git
|
||||
"""
|
||||
provider = repo.get("provider")
|
||||
account = repo.get("account")
|
||||
name = repo.get("repository")
|
||||
@@ -23,95 +18,82 @@ def build_default_ssh_url(repo: Repository) -> Optional[str]:
|
||||
if not provider or not account or not name:
|
||||
return None
|
||||
|
||||
provider = str(provider)
|
||||
account = str(account)
|
||||
name = str(name)
|
||||
|
||||
if port:
|
||||
return f"ssh://git@{provider}:{port}/{account}/{name}.git"
|
||||
|
||||
# GitHub-style shorthand
|
||||
return f"git@{provider}:{account}/{name}.git"
|
||||
|
||||
|
||||
def determine_primary_remote_url(
|
||||
repo: Repository,
|
||||
resolved_mirrors: MirrorMap,
|
||||
ctx: RepoMirrorContext,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Determine the primary remote URL in a consistent way:
|
||||
|
||||
1. resolved_mirrors["origin"]
|
||||
2. any resolved mirror (first by name)
|
||||
3. default SSH URL from provider/account/repository
|
||||
Priority order:
|
||||
1. origin from resolved mirrors
|
||||
2. MIRRORS file order
|
||||
3. config mirrors order
|
||||
4. default SSH URL
|
||||
"""
|
||||
if "origin" in resolved_mirrors:
|
||||
return resolved_mirrors["origin"]
|
||||
resolved = ctx.resolved_mirrors
|
||||
|
||||
if resolved_mirrors:
|
||||
first_name = sorted(resolved_mirrors.keys())[0]
|
||||
return resolved_mirrors[first_name]
|
||||
if resolved.get("origin"):
|
||||
return resolved["origin"]
|
||||
|
||||
for mirrors in (ctx.file_mirrors, ctx.config_mirrors):
|
||||
for _, url in mirrors.items():
|
||||
if url:
|
||||
return url
|
||||
|
||||
return build_default_ssh_url(repo)
|
||||
|
||||
|
||||
def _safe_git_output(args: List[str], cwd: str) -> Optional[str]:
|
||||
"""
|
||||
Run a Git command via run_git and return its stdout, or None on failure.
|
||||
"""
|
||||
try:
|
||||
return run_git(args, cwd=cwd)
|
||||
except GitError:
|
||||
return None
|
||||
|
||||
|
||||
def current_origin_url(repo_dir: str) -> Optional[str]:
|
||||
"""
|
||||
Return the current URL for remote 'origin', or None if not present.
|
||||
"""
|
||||
output = _safe_git_output(["remote", "get-url", "origin"], cwd=repo_dir)
|
||||
if not output:
|
||||
return None
|
||||
url = output.strip()
|
||||
return url or None
|
||||
|
||||
|
||||
def has_origin_remote(repo_dir: str) -> bool:
|
||||
"""
|
||||
Check whether a remote called 'origin' exists in the repository.
|
||||
"""
|
||||
output = _safe_git_output(["remote"], cwd=repo_dir)
|
||||
if not output:
|
||||
return False
|
||||
names = output.split()
|
||||
return "origin" in names
|
||||
out = _safe_git_output(["remote"], cwd=repo_dir)
|
||||
return bool(out and "origin" in out.split())
|
||||
|
||||
|
||||
def _ensure_push_urls_for_origin(
|
||||
def _set_origin_fetch_and_push(repo_dir: str, url: str, preview: bool) -> None:
|
||||
fetch = f"git remote set-url origin {url}"
|
||||
push = f"git remote set-url --push origin {url}"
|
||||
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would run in {repo_dir!r}: {fetch}")
|
||||
print(f"[PREVIEW] Would run in {repo_dir!r}: {push}")
|
||||
return
|
||||
|
||||
run_command(fetch, cwd=repo_dir, preview=False)
|
||||
run_command(push, cwd=repo_dir, preview=False)
|
||||
|
||||
|
||||
def _ensure_additional_push_urls(
|
||||
repo_dir: str,
|
||||
mirrors: MirrorMap,
|
||||
primary: str,
|
||||
preview: bool,
|
||||
) -> None:
|
||||
"""
|
||||
Ensure that all mirror URLs are present as push URLs on 'origin'.
|
||||
"""
|
||||
desired: Set[str] = {url for url in mirrors.values() if url}
|
||||
desired: Set[str] = {u for u in mirrors.values() if u and u != primary}
|
||||
if not desired:
|
||||
return
|
||||
|
||||
existing_output = _safe_git_output(
|
||||
out = _safe_git_output(
|
||||
["remote", "get-url", "--push", "--all", "origin"],
|
||||
cwd=repo_dir,
|
||||
)
|
||||
existing = set(existing_output.splitlines()) if existing_output else set()
|
||||
existing = set(out.splitlines()) if out else set()
|
||||
|
||||
missing = sorted(desired - existing)
|
||||
for url in missing:
|
||||
for url in sorted(desired - existing):
|
||||
cmd = f"git remote set-url --add --push origin {url}"
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would run in {repo_dir!r}: {cmd}")
|
||||
else:
|
||||
print(f"[INFO] Adding push URL to 'origin': {url}")
|
||||
run_command(cmd, cwd=repo_dir, preview=False)
|
||||
|
||||
|
||||
@@ -120,60 +102,32 @@ def ensure_origin_remote(
|
||||
ctx: RepoMirrorContext,
|
||||
preview: bool,
|
||||
) -> None:
|
||||
"""
|
||||
Ensure that a usable 'origin' remote exists and has all push URLs.
|
||||
"""
|
||||
repo_dir = ctx.repo_dir
|
||||
resolved_mirrors = ctx.resolved_mirrors
|
||||
|
||||
if not os.path.isdir(os.path.join(repo_dir, ".git")):
|
||||
print(f"[WARN] {repo_dir} is not a Git repository (no .git directory).")
|
||||
print(f"[WARN] {repo_dir} is not a Git repository.")
|
||||
return
|
||||
|
||||
url = determine_primary_remote_url(repo, resolved_mirrors)
|
||||
primary = determine_primary_remote_url(repo, ctx)
|
||||
if not primary:
|
||||
print("[WARN] No primary mirror URL could be determined.")
|
||||
return
|
||||
|
||||
if not has_origin_remote(repo_dir):
|
||||
if not url:
|
||||
print(
|
||||
"[WARN] Could not determine URL for 'origin' remote. "
|
||||
"Please configure mirrors or provider/account/repository."
|
||||
)
|
||||
return
|
||||
|
||||
cmd = f"git remote add origin {url}"
|
||||
cmd = f"git remote add origin {primary}"
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would run in {repo_dir!r}: {cmd}")
|
||||
else:
|
||||
print(f"[INFO] Adding 'origin' remote in {repo_dir}: {url}")
|
||||
run_command(cmd, cwd=repo_dir, preview=False)
|
||||
else:
|
||||
current = current_origin_url(repo_dir)
|
||||
if current == url or not url:
|
||||
print(
|
||||
"[INFO] 'origin' already points to "
|
||||
f"{current or '<unknown>'} (no change needed)."
|
||||
)
|
||||
else:
|
||||
# We do not auto-change origin here, only log the mismatch.
|
||||
print(
|
||||
"[INFO] 'origin' exists with URL "
|
||||
f"{current or '<unknown>'}; not changing to {url}."
|
||||
)
|
||||
|
||||
# Ensure all mirrors are present as push URLs
|
||||
_ensure_push_urls_for_origin(repo_dir, resolved_mirrors, preview)
|
||||
_set_origin_fetch_and_push(repo_dir, primary, preview)
|
||||
|
||||
_ensure_additional_push_urls(repo_dir, ctx.resolved_mirrors, primary, preview)
|
||||
|
||||
|
||||
def is_remote_reachable(url: str, cwd: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Check whether a remote repository is reachable via `git ls-remote`.
|
||||
|
||||
This does NOT modify anything; it only probes the remote.
|
||||
"""
|
||||
workdir = cwd or os.getcwd()
|
||||
try:
|
||||
# --exit-code → non-zero exit code if the remote does not exist
|
||||
run_git(["ls-remote", "--exit-code", url], cwd=workdir)
|
||||
run_git(["ls-remote", "--exit-code", url], cwd=cwd or os.getcwd())
|
||||
return True
|
||||
except GitError:
|
||||
return False
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# src/pkgmgr/actions/mirror/remote_provision.py
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
@@ -19,36 +18,28 @@ def ensure_remote_repository(
|
||||
preview: bool,
|
||||
) -> None:
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
resolved_mirrors = ctx.resolved_mirrors
|
||||
|
||||
primary_url = determine_primary_remote_url(repo, resolved_mirrors)
|
||||
primary_url = determine_primary_remote_url(repo, ctx)
|
||||
if not primary_url:
|
||||
print("[INFO] No remote URL could be derived; skipping remote provisioning.")
|
||||
print("[INFO] No primary URL found; skipping remote provisioning.")
|
||||
return
|
||||
|
||||
host_raw, owner_from_url, name_from_url = parse_repo_from_git_url(primary_url)
|
||||
host_raw, owner, name = parse_repo_from_git_url(primary_url)
|
||||
host = normalize_provider_host(host_raw)
|
||||
|
||||
if not host or not owner_from_url or not name_from_url:
|
||||
print("[WARN] Could not derive host/owner/repository from URL; cannot ensure remote repo.")
|
||||
print(f" url={primary_url!r}")
|
||||
print(f" host={host!r}, owner={owner_from_url!r}, repository={name_from_url!r}")
|
||||
if not host or not owner or not name:
|
||||
print("[WARN] Could not parse remote URL:", primary_url)
|
||||
return
|
||||
|
||||
print("------------------------------------------------------------")
|
||||
print(f"[REMOTE ENSURE] {ctx.identifier}")
|
||||
print(f"[REMOTE ENSURE] host: {host}")
|
||||
print("------------------------------------------------------------")
|
||||
|
||||
spec = RepoSpec(
|
||||
host=str(host),
|
||||
owner=str(owner_from_url),
|
||||
name=str(name_from_url),
|
||||
host=host,
|
||||
owner=owner,
|
||||
name=name,
|
||||
private=bool(repo.get("private", True)),
|
||||
description=str(repo.get("description", "")),
|
||||
)
|
||||
|
||||
provider_kind = str(repo.get("provider", "")).strip().lower() or None
|
||||
provider_kind = str(repo.get("provider", "")).lower() or None
|
||||
|
||||
try:
|
||||
result = ensure_remote_repo(
|
||||
@@ -66,5 +57,3 @@ def ensure_remote_repository(
|
||||
print(f"[REMOTE ENSURE] URL: {result.url}")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
print(f"[ERROR] Remote provisioning failed: {exc}")
|
||||
|
||||
print()
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# src/pkgmgr/actions/mirror/setup_cmd.py
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List
|
||||
@@ -9,6 +8,7 @@ from .remote_check import probe_mirror
|
||||
from .remote_provision import ensure_remote_repository
|
||||
from .types import Repository
|
||||
|
||||
|
||||
def _setup_local_mirrors_for_repo(
|
||||
repo: Repository,
|
||||
repositories_base_dir: str,
|
||||
@@ -22,7 +22,7 @@ def _setup_local_mirrors_for_repo(
|
||||
print(f"[MIRROR SETUP:LOCAL] dir: {ctx.repo_dir}")
|
||||
print("------------------------------------------------------------")
|
||||
|
||||
ensure_origin_remote(repo, ctx, preview=preview)
|
||||
ensure_origin_remote(repo, ctx, preview)
|
||||
print()
|
||||
|
||||
|
||||
@@ -34,7 +34,6 @@ def _setup_remote_mirrors_for_repo(
|
||||
ensure_remote: bool,
|
||||
) -> None:
|
||||
ctx = build_context(repo, repositories_base_dir, all_repos)
|
||||
resolved_mirrors = ctx.resolved_mirrors
|
||||
|
||||
print("------------------------------------------------------------")
|
||||
print(f"[MIRROR SETUP:REMOTE] {ctx.identifier}")
|
||||
@@ -44,37 +43,28 @@ def _setup_remote_mirrors_for_repo(
|
||||
if ensure_remote:
|
||||
ensure_remote_repository(
|
||||
repo,
|
||||
repositories_base_dir=repositories_base_dir,
|
||||
all_repos=all_repos,
|
||||
preview=preview,
|
||||
repositories_base_dir,
|
||||
all_repos,
|
||||
preview,
|
||||
)
|
||||
|
||||
if not resolved_mirrors:
|
||||
primary_url = determine_primary_remote_url(repo, resolved_mirrors)
|
||||
if not primary_url:
|
||||
print("[INFO] No mirrors configured and no primary URL available.")
|
||||
print()
|
||||
if not ctx.resolved_mirrors:
|
||||
primary = determine_primary_remote_url(repo, ctx)
|
||||
if not primary:
|
||||
return
|
||||
|
||||
ok, error_message = probe_mirror(primary_url, ctx.repo_dir)
|
||||
if ok:
|
||||
print(f"[OK] primary: {primary_url}")
|
||||
else:
|
||||
print(f"[WARN] primary: {primary_url}")
|
||||
for line in error_message.splitlines():
|
||||
print(f" {line}")
|
||||
|
||||
ok, msg = probe_mirror(primary, ctx.repo_dir)
|
||||
print("[OK]" if ok else "[WARN]", primary)
|
||||
if msg:
|
||||
print(msg)
|
||||
print()
|
||||
return
|
||||
|
||||
for name, url in sorted(resolved_mirrors.items()):
|
||||
ok, error_message = probe_mirror(url, ctx.repo_dir)
|
||||
if ok:
|
||||
print(f"[OK] {name}: {url}")
|
||||
else:
|
||||
print(f"[WARN] {name}: {url}")
|
||||
for line in error_message.splitlines():
|
||||
print(f" {line}")
|
||||
for name, url in ctx.resolved_mirrors.items():
|
||||
ok, msg = probe_mirror(url, ctx.repo_dir)
|
||||
print(f"[OK] {name}: {url}" if ok else f"[WARN] {name}: {url}")
|
||||
if msg:
|
||||
print(msg)
|
||||
|
||||
print()
|
||||
|
||||
@@ -91,17 +81,17 @@ def setup_mirrors(
|
||||
for repo in selected_repos:
|
||||
if local:
|
||||
_setup_local_mirrors_for_repo(
|
||||
repo=repo,
|
||||
repositories_base_dir=repositories_base_dir,
|
||||
all_repos=all_repos,
|
||||
preview=preview,
|
||||
repo,
|
||||
repositories_base_dir,
|
||||
all_repos,
|
||||
preview,
|
||||
)
|
||||
|
||||
if remote:
|
||||
_setup_remote_mirrors_for_repo(
|
||||
repo=repo,
|
||||
repositories_base_dir=repositories_base_dir,
|
||||
all_repos=all_repos,
|
||||
preview=preview,
|
||||
ensure_remote=ensure_remote,
|
||||
repo,
|
||||
repositories_base_dir,
|
||||
all_repos,
|
||||
preview,
|
||||
ensure_remote,
|
||||
)
|
||||
|
||||
@@ -84,10 +84,13 @@ def publish(
|
||||
raise RuntimeError("No build artifacts found in dist/.")
|
||||
|
||||
resolver = TokenResolver()
|
||||
|
||||
# Store PyPI token per OS user (keyring is already user-scoped).
|
||||
# Do NOT scope by project name.
|
||||
token = resolver.get_token(
|
||||
provider_kind="pypi",
|
||||
host=target.host,
|
||||
owner=target.project,
|
||||
owner=None,
|
||||
options=ResolutionOptions(
|
||||
interactive=interactive,
|
||||
allow_prompt=allow_prompt,
|
||||
|
||||
@@ -1,143 +1,257 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import yaml
|
||||
|
||||
from pkgmgr.actions.mirror.io import write_mirrors_file
|
||||
from pkgmgr.actions.mirror.setup_cmd import setup_mirrors
|
||||
from pkgmgr.actions.repository.scaffold import render_default_templates
|
||||
from pkgmgr.core.command.alias import generate_alias
|
||||
from pkgmgr.core.config.save import save_user_config
|
||||
|
||||
def create_repo(identifier, config_merged, user_config_path, bin_dir, remote=False, preview=False):
|
||||
"""
|
||||
Creates a new repository by performing the following steps:
|
||||
|
||||
1. Parses the identifier (provider:port/account/repository) and adds a new entry to the user config
|
||||
if it is not already present. The provider part is split into provider and port (if provided).
|
||||
2. Creates the local repository directory and initializes a Git repository.
|
||||
3. If --remote is set, checks for an existing "origin" remote (removing it if found),
|
||||
adds the remote using a URL built from provider, port, account, and repository,
|
||||
creates an initial commit (e.g. with a README.md), and pushes to the remote.
|
||||
The push is attempted on both "main" and "master" branches.
|
||||
"""
|
||||
parts = identifier.split("/")
|
||||
Repository = Dict[str, Any]
|
||||
|
||||
_NAME_RE = re.compile(r"^[a-z0-9_-]+$")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RepoParts:
|
||||
host: str
|
||||
port: Optional[str]
|
||||
owner: str
|
||||
name: str
|
||||
|
||||
|
||||
def _run(cmd: str, cwd: str, preview: bool) -> None:
|
||||
if preview:
|
||||
print(f"[Preview] Would run in {cwd}: {cmd}")
|
||||
return
|
||||
subprocess.run(cmd, cwd=cwd, shell=True, check=True)
|
||||
|
||||
|
||||
def _git_get(key: str) -> str:
|
||||
try:
|
||||
out = subprocess.run(
|
||||
f"git config --get {key}",
|
||||
shell=True,
|
||||
check=False,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
return (out.stdout or "").strip()
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def _split_host_port(host_with_port: str) -> Tuple[str, Optional[str]]:
|
||||
if ":" in host_with_port:
|
||||
host, port = host_with_port.split(":", 1)
|
||||
return host, port or None
|
||||
return host_with_port, None
|
||||
|
||||
|
||||
def _strip_git_suffix(name: str) -> str:
|
||||
return name[:-4] if name.endswith(".git") else name
|
||||
|
||||
|
||||
def _parse_git_url(url: str) -> RepoParts:
|
||||
if url.startswith("git@") and "://" not in url:
|
||||
left, right = url.split(":", 1)
|
||||
host = left.split("@", 1)[1]
|
||||
path = right.lstrip("/")
|
||||
owner, name = path.split("/", 1)
|
||||
return RepoParts(host=host, port=None, owner=owner, name=_strip_git_suffix(name))
|
||||
|
||||
parsed = urlparse(url)
|
||||
host = (parsed.hostname or "").strip()
|
||||
port = str(parsed.port) if parsed.port else None
|
||||
path = (parsed.path or "").strip("/")
|
||||
|
||||
if not host or not path or "/" not in path:
|
||||
raise ValueError(f"Could not parse git URL: {url}")
|
||||
|
||||
owner, name = path.split("/", 1)
|
||||
return RepoParts(host=host, port=port, owner=owner, name=_strip_git_suffix(name))
|
||||
|
||||
|
||||
def _parse_identifier(identifier: str) -> RepoParts:
|
||||
ident = identifier.strip()
|
||||
|
||||
if "://" in ident or ident.startswith("git@"):
|
||||
return _parse_git_url(ident)
|
||||
|
||||
parts = ident.split("/")
|
||||
if len(parts) != 3:
|
||||
print("Identifier must be in the format 'provider:port/account/repository' (port is optional).")
|
||||
raise ValueError("Identifier must be URL or 'provider(:port)/owner/repo'.")
|
||||
|
||||
host_with_port, owner, name = parts
|
||||
host, port = _split_host_port(host_with_port)
|
||||
return RepoParts(host=host, port=port, owner=owner, name=name)
|
||||
|
||||
|
||||
def _ensure_valid_repo_name(name: str) -> None:
|
||||
if not name or not _NAME_RE.fullmatch(name):
|
||||
raise ValueError("Repository name must match: lowercase a-z, 0-9, '_' and '-'.")
|
||||
|
||||
|
||||
def _repo_homepage(host: str, owner: str, name: str) -> str:
|
||||
return f"https://{host}/{owner}/{name}"
|
||||
|
||||
|
||||
def _build_default_primary_url(parts: RepoParts) -> str:
|
||||
if parts.port:
|
||||
return f"ssh://git@{parts.host}:{parts.port}/{parts.owner}/{parts.name}.git"
|
||||
return f"git@{parts.host}:{parts.owner}/{parts.name}.git"
|
||||
|
||||
|
||||
def _write_default_mirrors(repo_dir: str, primary: str, name: str, preview: bool) -> None:
|
||||
mirrors = {"origin": primary, "pypi": f"https://pypi.org/project/{name}/"}
|
||||
write_mirrors_file(repo_dir, mirrors, preview=preview)
|
||||
|
||||
|
||||
def _git_init_and_initial_commit(repo_dir: str, preview: bool) -> None:
|
||||
_run("git init", cwd=repo_dir, preview=preview)
|
||||
_run("git add -A", cwd=repo_dir, preview=preview)
|
||||
|
||||
if preview:
|
||||
print(f'[Preview] Would run in {repo_dir}: git commit -m "Initial commit"')
|
||||
return
|
||||
|
||||
provider_with_port, account, repository = parts
|
||||
# Split provider and port if a colon is present.
|
||||
if ":" in provider_with_port:
|
||||
provider_name, port = provider_with_port.split(":", 1)
|
||||
else:
|
||||
provider_name = provider_with_port
|
||||
port = None
|
||||
subprocess.run('git commit -m "Initial commit"', cwd=repo_dir, shell=True, check=False)
|
||||
|
||||
# Check if the repository is already present in the merged config (including port)
|
||||
exists = False
|
||||
for repo in config_merged.get("repositories", []):
|
||||
if (repo.get("provider") == provider_name and
|
||||
repo.get("account") == account and
|
||||
repo.get("repository") == repository):
|
||||
exists = True
|
||||
print(f"Repository {identifier} already exists in the configuration.")
|
||||
break
|
||||
|
||||
def _git_push_main_or_master(repo_dir: str, preview: bool) -> None:
|
||||
_run("git branch -M main", cwd=repo_dir, preview=preview)
|
||||
try:
|
||||
_run("git push -u origin main", cwd=repo_dir, preview=preview)
|
||||
return
|
||||
except subprocess.CalledProcessError:
|
||||
pass
|
||||
|
||||
try:
|
||||
_run("git branch -M master", cwd=repo_dir, preview=preview)
|
||||
_run("git push -u origin master", cwd=repo_dir, preview=preview)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
print(f"[WARN] Push failed: {exc}")
|
||||
|
||||
|
||||
def create_repo(
|
||||
identifier: str,
|
||||
config_merged: Dict[str, Any],
|
||||
user_config_path: str,
|
||||
bin_dir: str,
|
||||
*,
|
||||
remote: bool = False,
|
||||
preview: bool = False,
|
||||
) -> None:
|
||||
parts = _parse_identifier(identifier)
|
||||
_ensure_valid_repo_name(parts.name)
|
||||
|
||||
directories = config_merged.get("directories") or {}
|
||||
base_dir = os.path.expanduser(str(directories.get("repositories", "~/Repositories")))
|
||||
repo_dir = os.path.join(base_dir, parts.host, parts.owner, parts.name)
|
||||
|
||||
author_name = _git_get("user.name") or "Unknown Author"
|
||||
author_email = _git_get("user.email") or "unknown@example.invalid"
|
||||
|
||||
homepage = _repo_homepage(parts.host, parts.owner, parts.name)
|
||||
primary_url = _build_default_primary_url(parts)
|
||||
|
||||
repositories = config_merged.get("repositories") or []
|
||||
exists = any(
|
||||
(
|
||||
r.get("provider") == parts.host
|
||||
and r.get("account") == parts.owner
|
||||
and r.get("repository") == parts.name
|
||||
)
|
||||
for r in repositories
|
||||
)
|
||||
|
||||
if not exists:
|
||||
# Create a new entry with an automatically generated alias.
|
||||
new_entry = {
|
||||
"provider": provider_name,
|
||||
"port": port,
|
||||
"account": account,
|
||||
"repository": repository,
|
||||
"alias": generate_alias({"repository": repository, "provider": provider_name, "account": account}, bin_dir, existing_aliases=set()),
|
||||
"verified": {} # No initial verification info
|
||||
new_entry: Repository = {
|
||||
"provider": parts.host,
|
||||
"port": parts.port,
|
||||
"account": parts.owner,
|
||||
"repository": parts.name,
|
||||
"homepage": homepage,
|
||||
"alias": generate_alias(
|
||||
{"repository": parts.name, "provider": parts.host, "account": parts.owner},
|
||||
bin_dir,
|
||||
existing_aliases=set(),
|
||||
),
|
||||
"verified": {},
|
||||
}
|
||||
# Load or initialize the user configuration.
|
||||
|
||||
if os.path.exists(user_config_path):
|
||||
with open(user_config_path, "r") as f:
|
||||
with open(user_config_path, "r", encoding="utf-8") as f:
|
||||
user_config = yaml.safe_load(f) or {}
|
||||
else:
|
||||
user_config = {"repositories": []}
|
||||
|
||||
user_config.setdefault("repositories", [])
|
||||
user_config["repositories"].append(new_entry)
|
||||
save_user_config(user_config, user_config_path)
|
||||
print(f"Repository {identifier} added to the configuration.")
|
||||
# Also update the merged configuration object.
|
||||
config_merged.setdefault("repositories", []).append(new_entry)
|
||||
|
||||
# Create the local repository directory based on the configured base directory.
|
||||
base_dir = os.path.expanduser(config_merged["directories"]["repositories"])
|
||||
repo_dir = os.path.join(base_dir, provider_name, account, repository)
|
||||
if not os.path.exists(repo_dir):
|
||||
os.makedirs(repo_dir, exist_ok=True)
|
||||
print(f"Local repository directory created: {repo_dir}")
|
||||
else:
|
||||
print(f"Local repository directory already exists: {repo_dir}")
|
||||
|
||||
# Initialize a Git repository if not already initialized.
|
||||
if not os.path.exists(os.path.join(repo_dir, ".git")):
|
||||
cmd_init = "git init"
|
||||
if preview:
|
||||
print(f"[Preview] Would execute: '{cmd_init}' in {repo_dir}")
|
||||
print(f"[Preview] Would save user config: {user_config_path}")
|
||||
else:
|
||||
subprocess.run(cmd_init, cwd=repo_dir, shell=True, check=True)
|
||||
print(f"Git repository initialized in {repo_dir}.")
|
||||
save_user_config(user_config, user_config_path)
|
||||
|
||||
config_merged.setdefault("repositories", []).append(new_entry)
|
||||
repo = new_entry
|
||||
print(f"[INFO] Added repository to configuration: {parts.host}/{parts.owner}/{parts.name}")
|
||||
else:
|
||||
print("Git repository is already initialized.")
|
||||
repo = next(
|
||||
r
|
||||
for r in repositories
|
||||
if (
|
||||
r.get("provider") == parts.host
|
||||
and r.get("account") == parts.owner
|
||||
and r.get("repository") == parts.name
|
||||
)
|
||||
)
|
||||
print(f"[INFO] Repository already in configuration: {parts.host}/{parts.owner}/{parts.name}")
|
||||
|
||||
if preview:
|
||||
print(f"[Preview] Would ensure directory exists: {repo_dir}")
|
||||
else:
|
||||
os.makedirs(repo_dir, exist_ok=True)
|
||||
|
||||
tpl_context = {
|
||||
"provider": parts.host,
|
||||
"port": parts.port,
|
||||
"account": parts.owner,
|
||||
"repository": parts.name,
|
||||
"homepage": homepage,
|
||||
"author_name": author_name,
|
||||
"author_email": author_email,
|
||||
"license_text": f"All rights reserved by {author_name}",
|
||||
"primary_remote": primary_url,
|
||||
}
|
||||
|
||||
render_default_templates(repo_dir, context=tpl_context, preview=preview)
|
||||
_git_init_and_initial_commit(repo_dir, preview=preview)
|
||||
|
||||
_write_default_mirrors(repo_dir, primary=primary_url, name=parts.name, preview=preview)
|
||||
|
||||
repo.setdefault("mirrors", {})
|
||||
repo["mirrors"].setdefault("origin", primary_url)
|
||||
repo["mirrors"].setdefault("pypi", f"https://pypi.org/project/{parts.name}/")
|
||||
|
||||
setup_mirrors(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir=base_dir,
|
||||
all_repos=config_merged.get("repositories", []),
|
||||
preview=preview,
|
||||
local=True,
|
||||
remote=True,
|
||||
ensure_remote=bool(remote),
|
||||
)
|
||||
|
||||
if remote:
|
||||
# Create a README.md if it does not exist to have content for an initial commit.
|
||||
readme_path = os.path.join(repo_dir, "README.md")
|
||||
if not os.path.exists(readme_path):
|
||||
if preview:
|
||||
print(f"[Preview] Would create README.md in {repo_dir}.")
|
||||
else:
|
||||
with open(readme_path, "w") as f:
|
||||
f.write(f"# {repository}\n")
|
||||
subprocess.run("git add README.md", cwd=repo_dir, shell=True, check=True)
|
||||
subprocess.run('git commit -m "Initial commit"', cwd=repo_dir, shell=True, check=True)
|
||||
print("README.md created and initial commit made.")
|
||||
|
||||
# Build the remote URL.
|
||||
if provider_name.lower() == "github.com":
|
||||
remote_url = f"git@{provider_name}:{account}/{repository}.git"
|
||||
else:
|
||||
if port:
|
||||
remote_url = f"ssh://git@{provider_name}:{port}/{account}/{repository}.git"
|
||||
else:
|
||||
remote_url = f"ssh://git@{provider_name}/{account}/{repository}.git"
|
||||
|
||||
# Check if the remote "origin" already exists.
|
||||
cmd_list = "git remote"
|
||||
if preview:
|
||||
print(f"[Preview] Would check for existing remotes in {repo_dir}")
|
||||
remote_exists = False # Assume no remote in preview mode.
|
||||
else:
|
||||
result = subprocess.run(cmd_list, cwd=repo_dir, shell=True, capture_output=True, text=True, check=True)
|
||||
remote_list = result.stdout.strip().split()
|
||||
remote_exists = "origin" in remote_list
|
||||
|
||||
if remote_exists:
|
||||
# Remove the existing remote "origin".
|
||||
cmd_remove = "git remote remove origin"
|
||||
if preview:
|
||||
print(f"[Preview] Would execute: '{cmd_remove}' in {repo_dir}")
|
||||
else:
|
||||
subprocess.run(cmd_remove, cwd=repo_dir, shell=True, check=True)
|
||||
print("Existing remote 'origin' removed.")
|
||||
|
||||
# Now add the new remote.
|
||||
cmd_remote = f"git remote add origin {remote_url}"
|
||||
if preview:
|
||||
print(f"[Preview] Would execute: '{cmd_remote}' in {repo_dir}")
|
||||
else:
|
||||
try:
|
||||
subprocess.run(cmd_remote, cwd=repo_dir, shell=True, check=True)
|
||||
print(f"Remote 'origin' added: {remote_url}")
|
||||
except subprocess.CalledProcessError:
|
||||
print(f"Failed to add remote using URL: {remote_url}.")
|
||||
|
||||
# Push the initial commit to the remote repository
|
||||
cmd_push = "git push -u origin master"
|
||||
if preview:
|
||||
print(f"[Preview] Would execute: '{cmd_push}' in {repo_dir}")
|
||||
else:
|
||||
subprocess.run(cmd_push, cwd=repo_dir, shell=True, check=True)
|
||||
print("Initial push to the remote repository completed.")
|
||||
_git_push_main_or_master(repo_dir, preview=preview)
|
||||
|
||||
105
src/pkgmgr/actions/repository/scaffold.py
Normal file
105
src/pkgmgr/actions/repository/scaffold.py
Normal file
@@ -0,0 +1,105 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
try:
|
||||
from jinja2 import Environment, FileSystemLoader, StrictUndefined
|
||||
except Exception as exc: # pragma: no cover
|
||||
Environment = None # type: ignore[assignment]
|
||||
FileSystemLoader = None # type: ignore[assignment]
|
||||
StrictUndefined = None # type: ignore[assignment]
|
||||
_JINJA_IMPORT_ERROR = exc
|
||||
else:
|
||||
_JINJA_IMPORT_ERROR = None
|
||||
|
||||
|
||||
def _repo_root_from_here(anchor: Optional[Path] = None) -> str:
|
||||
"""
|
||||
Prefer git root (robust in editable installs / different layouts).
|
||||
Fallback to a conservative relative parent lookup.
|
||||
"""
|
||||
here = (anchor or Path(__file__)).resolve().parent
|
||||
try:
|
||||
r = subprocess.run(
|
||||
["git", "rev-parse", "--show-toplevel"],
|
||||
cwd=str(here),
|
||||
check=False,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if r.returncode == 0:
|
||||
top = (r.stdout or "").strip()
|
||||
if top:
|
||||
return top
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback: src/pkgmgr/actions/repository/scaffold.py -> <repo root> = parents[5]
|
||||
p = (anchor or Path(__file__)).resolve()
|
||||
if len(p.parents) < 6:
|
||||
raise RuntimeError(f"Unexpected path depth for: {p}")
|
||||
return str(p.parents[5])
|
||||
|
||||
|
||||
def _templates_dir() -> str:
|
||||
return os.path.join(_repo_root_from_here(), "templates", "default")
|
||||
|
||||
|
||||
def render_default_templates(
|
||||
repo_dir: str,
|
||||
*,
|
||||
context: Dict[str, Any],
|
||||
preview: bool,
|
||||
) -> None:
|
||||
"""
|
||||
Render templates/default/*.j2 into repo_dir.
|
||||
Keeps create.py clean: create.py calls this function only.
|
||||
"""
|
||||
tpl_dir = _templates_dir()
|
||||
if not os.path.isdir(tpl_dir):
|
||||
raise RuntimeError(f"Templates directory not found: {tpl_dir}")
|
||||
|
||||
# Preview mode: do not require Jinja2 at all. We only print planned outputs.
|
||||
if preview:
|
||||
for root, _, files in os.walk(tpl_dir):
|
||||
for fn in files:
|
||||
if not fn.endswith(".j2"):
|
||||
continue
|
||||
abs_src = os.path.join(root, fn)
|
||||
rel_src = os.path.relpath(abs_src, tpl_dir)
|
||||
rel_out = rel_src[:-3]
|
||||
print(f"[Preview] Would render template: {rel_src} -> {rel_out}")
|
||||
return
|
||||
|
||||
if Environment is None or FileSystemLoader is None or StrictUndefined is None:
|
||||
raise RuntimeError(
|
||||
"Jinja2 is required for repo templates but is not available. "
|
||||
f"Import error: {_JINJA_IMPORT_ERROR}"
|
||||
)
|
||||
|
||||
env = Environment(
|
||||
loader=FileSystemLoader(tpl_dir),
|
||||
undefined=StrictUndefined,
|
||||
autoescape=False,
|
||||
keep_trailing_newline=True,
|
||||
)
|
||||
|
||||
for root, _, files in os.walk(tpl_dir):
|
||||
for fn in files:
|
||||
if not fn.endswith(".j2"):
|
||||
continue
|
||||
|
||||
abs_src = os.path.join(root, fn)
|
||||
rel_src = os.path.relpath(abs_src, tpl_dir)
|
||||
rel_out = rel_src[:-3]
|
||||
abs_out = os.path.join(repo_dir, rel_out)
|
||||
|
||||
os.makedirs(os.path.dirname(abs_out), exist_ok=True)
|
||||
template = env.get_template(rel_src)
|
||||
rendered = template.render(**context)
|
||||
|
||||
with open(abs_out, "w", encoding="utf-8") as f:
|
||||
f.write(rendered)
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Iterable
|
||||
from typing import Any, Iterable, List, Tuple
|
||||
|
||||
from pkgmgr.actions.update.system_updater import SystemUpdater
|
||||
|
||||
@@ -30,32 +30,73 @@ class UpdateManager:
|
||||
quiet: bool,
|
||||
update_dependencies: bool,
|
||||
clone_mode: str,
|
||||
silent: bool = False,
|
||||
force_update: bool = True,
|
||||
) -> None:
|
||||
from pkgmgr.actions.install import install_repos
|
||||
from pkgmgr.actions.repository.pull import pull_with_verification
|
||||
from pkgmgr.core.repository.identifier import get_repo_identifier
|
||||
|
||||
pull_with_verification(
|
||||
selected_repos,
|
||||
repositories_base_dir,
|
||||
all_repos,
|
||||
[],
|
||||
no_verification,
|
||||
preview,
|
||||
)
|
||||
failures: List[Tuple[str, str]] = []
|
||||
|
||||
install_repos(
|
||||
selected_repos,
|
||||
repositories_base_dir,
|
||||
bin_dir,
|
||||
all_repos,
|
||||
no_verification,
|
||||
preview,
|
||||
quiet,
|
||||
clone_mode,
|
||||
update_dependencies,
|
||||
force_update=force_update,
|
||||
)
|
||||
for repo in list(selected_repos):
|
||||
identifier = get_repo_identifier(repo, all_repos)
|
||||
|
||||
try:
|
||||
pull_with_verification(
|
||||
[repo],
|
||||
repositories_base_dir,
|
||||
all_repos,
|
||||
[],
|
||||
no_verification,
|
||||
preview,
|
||||
)
|
||||
except SystemExit as exc:
|
||||
code = exc.code if isinstance(exc.code, int) else str(exc.code)
|
||||
failures.append((identifier, f"pull failed (exit={code})"))
|
||||
if not quiet:
|
||||
print(f"[Warning] update: pull failed for {identifier} (exit={code}). Continuing...")
|
||||
continue
|
||||
except Exception as exc:
|
||||
failures.append((identifier, f"pull failed: {exc}"))
|
||||
if not quiet:
|
||||
print(f"[Warning] update: pull failed for {identifier}: {exc}. Continuing...")
|
||||
continue
|
||||
|
||||
try:
|
||||
install_repos(
|
||||
[repo],
|
||||
repositories_base_dir,
|
||||
bin_dir,
|
||||
all_repos,
|
||||
no_verification,
|
||||
preview,
|
||||
quiet,
|
||||
clone_mode,
|
||||
update_dependencies,
|
||||
force_update=force_update,
|
||||
silent=silent,
|
||||
emit_summary=False,
|
||||
)
|
||||
except SystemExit as exc:
|
||||
code = exc.code if isinstance(exc.code, int) else str(exc.code)
|
||||
failures.append((identifier, f"install failed (exit={code})"))
|
||||
if not quiet:
|
||||
print(f"[Warning] update: install failed for {identifier} (exit={code}). Continuing...")
|
||||
continue
|
||||
except Exception as exc:
|
||||
failures.append((identifier, f"install failed: {exc}"))
|
||||
if not quiet:
|
||||
print(f"[Warning] update: install failed for {identifier}: {exc}. Continuing...")
|
||||
continue
|
||||
|
||||
if failures and not quiet:
|
||||
print("\n[pkgmgr] Update finished with warnings:")
|
||||
for ident, msg in failures:
|
||||
print(f" - {ident}: {msg}")
|
||||
|
||||
if failures and not silent:
|
||||
raise SystemExit(1)
|
||||
|
||||
if system_update:
|
||||
self._system_updater.run(preview=preview)
|
||||
|
||||
@@ -1,31 +1,17 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Release command wiring for the pkgmgr CLI.
|
||||
|
||||
This module implements the `pkgmgr release` subcommand on top of the
|
||||
generic selection logic from cli.dispatch. It does not define its
|
||||
own subparser; the CLI surface is configured in cli.parser.
|
||||
|
||||
Responsibilities:
|
||||
- Take the parsed argparse.Namespace for the `release` command.
|
||||
- Use the list of selected repositories provided by dispatch_command().
|
||||
- Optionally list affected repositories when --list is set.
|
||||
- For each selected repository, run pkgmgr.actions.release.release(...) in
|
||||
the context of that repository directory.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from pkgmgr.actions.publish import publish as run_publish
|
||||
from pkgmgr.actions.release import release as run_release
|
||||
from pkgmgr.cli.context import CLIContext
|
||||
from pkgmgr.core.repository.dir import get_repo_dir
|
||||
from pkgmgr.core.repository.identifier import get_repo_identifier
|
||||
from pkgmgr.actions.release import release as run_release
|
||||
|
||||
|
||||
Repository = Dict[str, Any]
|
||||
|
||||
@@ -35,23 +21,10 @@ def handle_release(
|
||||
ctx: CLIContext,
|
||||
selected: List[Repository],
|
||||
) -> None:
|
||||
"""
|
||||
Handle the `pkgmgr release` subcommand.
|
||||
|
||||
Flow:
|
||||
1) Use the `selected` repositories as computed by dispatch_command().
|
||||
2) If --list is given, print the identifiers of the selected repos
|
||||
and return without running any release.
|
||||
3) For each selected repository:
|
||||
- Resolve its identifier and local directory.
|
||||
- Change into that directory.
|
||||
- Call pkgmgr.actions.release.release(...) with the parsed options.
|
||||
"""
|
||||
if not selected:
|
||||
print("[pkgmgr] No repositories selected for release.")
|
||||
return
|
||||
|
||||
# List-only mode: show which repositories would be affected.
|
||||
if getattr(args, "list", False):
|
||||
print("[pkgmgr] Repositories that would be affected by this release:")
|
||||
for repo in selected:
|
||||
@@ -62,29 +35,22 @@ def handle_release(
|
||||
for repo in selected:
|
||||
identifier = get_repo_identifier(repo, ctx.all_repositories)
|
||||
|
||||
repo_dir = repo.get("directory")
|
||||
if not repo_dir:
|
||||
try:
|
||||
repo_dir = get_repo_dir(ctx.repositories_base_dir, repo)
|
||||
except Exception:
|
||||
repo_dir = None
|
||||
|
||||
if not repo_dir or not os.path.isdir(repo_dir):
|
||||
print(
|
||||
f"[WARN] Skipping repository {identifier}: "
|
||||
"local directory does not exist."
|
||||
)
|
||||
try:
|
||||
repo_dir = repo.get("directory") or get_repo_dir(ctx.repositories_base_dir, repo)
|
||||
except Exception as exc:
|
||||
print(f"[WARN] Skipping repository {identifier}: failed to resolve directory: {exc}")
|
||||
continue
|
||||
|
||||
print(
|
||||
f"[pkgmgr] Running release for repository {identifier} "
|
||||
f"in '{repo_dir}'..."
|
||||
)
|
||||
if not os.path.isdir(repo_dir):
|
||||
print(f"[WARN] Skipping repository {identifier}: directory missing.")
|
||||
continue
|
||||
|
||||
print(f"[pkgmgr] Running release for repository {identifier}...")
|
||||
|
||||
# Change to repo directory and invoke the helper.
|
||||
cwd_before = os.getcwd()
|
||||
try:
|
||||
os.chdir(repo_dir)
|
||||
|
||||
run_release(
|
||||
pyproject_path="pyproject.toml",
|
||||
changelog_path="CHANGELOG.md",
|
||||
@@ -94,5 +60,17 @@ def handle_release(
|
||||
force=getattr(args, "force", False),
|
||||
close=getattr(args, "close", False),
|
||||
)
|
||||
|
||||
if not getattr(args, "no_publish", False):
|
||||
print(f"[pkgmgr] Running publish for repository {identifier}...")
|
||||
is_tty = sys.stdin.isatty()
|
||||
run_publish(
|
||||
repo=repo,
|
||||
repo_dir=repo_dir,
|
||||
preview=getattr(args, "preview", False),
|
||||
interactive=is_tty,
|
||||
allow_prompt=is_tty,
|
||||
)
|
||||
|
||||
finally:
|
||||
os.chdir(cwd_before)
|
||||
|
||||
@@ -68,6 +68,7 @@ def handle_repos_command(
|
||||
args.clone_mode,
|
||||
args.dependencies,
|
||||
force_update=getattr(args, "update", False),
|
||||
silent=getattr(args, "silent", False),
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
@@ -105,6 +105,7 @@ def dispatch_command(args, ctx: CLIContext) -> None:
|
||||
|
||||
if args.command == "update":
|
||||
from pkgmgr.actions.update import UpdateManager
|
||||
|
||||
UpdateManager().run(
|
||||
selected_repos=selected,
|
||||
repositories_base_dir=ctx.repositories_base_dir,
|
||||
@@ -116,6 +117,7 @@ def dispatch_command(args, ctx: CLIContext) -> None:
|
||||
quiet=args.quiet,
|
||||
update_dependencies=args.dependencies,
|
||||
clone_mode=args.clone_mode,
|
||||
silent=getattr(args, "silent", False),
|
||||
force_update=True,
|
||||
)
|
||||
return
|
||||
|
||||
@@ -4,18 +4,18 @@ import argparse
|
||||
|
||||
from pkgmgr.cli.proxy import register_proxy_commands
|
||||
|
||||
from .common import SortedSubParsersAction
|
||||
from .install_update import add_install_update_subparsers
|
||||
from .config_cmd import add_config_subparsers
|
||||
from .navigation_cmd import add_navigation_subparsers
|
||||
from .branch_cmd import add_branch_subparsers
|
||||
from .release_cmd import add_release_subparser
|
||||
from .publish_cmd import add_publish_subparser
|
||||
from .version_cmd import add_version_subparser
|
||||
from .changelog_cmd import add_changelog_subparser
|
||||
from .common import SortedSubParsersAction
|
||||
from .config_cmd import add_config_subparsers
|
||||
from .install_update import add_install_update_subparsers
|
||||
from .list_cmd import add_list_subparser
|
||||
from .make_cmd import add_make_subparsers
|
||||
from .mirror_cmd import add_mirror_subparsers
|
||||
from .navigation_cmd import add_navigation_subparsers
|
||||
from .publish_cmd import add_publish_subparser
|
||||
from .release_cmd import add_release_subparser
|
||||
from .version_cmd import add_version_subparser
|
||||
|
||||
|
||||
def create_parser(description_text: str) -> argparse.ArgumentParser:
|
||||
@@ -23,12 +23,34 @@ def create_parser(description_text: str) -> argparse.ArgumentParser:
|
||||
description=description_text,
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
)
|
||||
|
||||
subparsers = parser.add_subparsers(
|
||||
dest="command",
|
||||
help="Subcommands",
|
||||
action=SortedSubParsersAction,
|
||||
)
|
||||
|
||||
# create
|
||||
p_create = subparsers.add_parser(
|
||||
"create",
|
||||
help="Create a new repository (scaffold + config).",
|
||||
)
|
||||
p_create.add_argument(
|
||||
"identifiers",
|
||||
nargs="+",
|
||||
help="Repository identifier(s): URL or 'provider(:port)/owner/repo'.",
|
||||
)
|
||||
p_create.add_argument(
|
||||
"--remote",
|
||||
action="store_true",
|
||||
help="Also push an initial commit to the remote (main/master).",
|
||||
)
|
||||
p_create.add_argument(
|
||||
"--preview",
|
||||
action="store_true",
|
||||
help="Print actions without writing files or executing commands.",
|
||||
)
|
||||
|
||||
add_install_update_subparsers(subparsers)
|
||||
add_config_subparsers(subparsers)
|
||||
add_navigation_subparsers(subparsers)
|
||||
|
||||
@@ -168,3 +168,10 @@ def add_install_update_arguments(subparser: argparse.ArgumentParser) -> None:
|
||||
default="ssh",
|
||||
help="Specify clone mode (default: ssh).",
|
||||
)
|
||||
|
||||
_add_option_if_missing(
|
||||
subparser,
|
||||
"--silent",
|
||||
action="store_true",
|
||||
help="Continue with other repositories if one fails; downgrade errors to warnings.",
|
||||
)
|
||||
|
||||
@@ -21,22 +21,22 @@ def add_release_subparser(
|
||||
"and updating the changelog."
|
||||
),
|
||||
)
|
||||
|
||||
release_parser.add_argument(
|
||||
"release_type",
|
||||
choices=["major", "minor", "patch"],
|
||||
help="Type of version increment for the release (major, minor, patch).",
|
||||
)
|
||||
|
||||
release_parser.add_argument(
|
||||
"-m",
|
||||
"--message",
|
||||
default=None,
|
||||
help=(
|
||||
"Optional release message to add to the changelog and tag."
|
||||
),
|
||||
help="Optional release message to add to the changelog and tag.",
|
||||
)
|
||||
# Generic selection / preview / list / extra_args
|
||||
|
||||
add_identifier_arguments(release_parser)
|
||||
# Close current branch after successful release
|
||||
|
||||
release_parser.add_argument(
|
||||
"--close",
|
||||
action="store_true",
|
||||
@@ -45,7 +45,7 @@ def add_release_subparser(
|
||||
"repository, if it is not main/master."
|
||||
),
|
||||
)
|
||||
# Force: skip preview+confirmation and run release directly
|
||||
|
||||
release_parser.add_argument(
|
||||
"-f",
|
||||
"--force",
|
||||
@@ -55,3 +55,9 @@ def add_release_subparser(
|
||||
"release directly."
|
||||
),
|
||||
)
|
||||
|
||||
release_parser.add_argument(
|
||||
"--no-publish",
|
||||
action="store_true",
|
||||
help="Do not run publish automatically after a successful release.",
|
||||
)
|
||||
|
||||
5
templates/default/.gitignore.j2
Normal file
5
templates/default/.gitignore.j2
Normal file
@@ -0,0 +1,5 @@
|
||||
.venv/
|
||||
dist/
|
||||
build/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
1
templates/default/LICENSE.j2
Normal file
1
templates/default/LICENSE.j2
Normal file
@@ -0,0 +1 @@
|
||||
{{ license_text }}
|
||||
6
templates/default/README.md.j2
Normal file
6
templates/default/README.md.j2
Normal file
@@ -0,0 +1,6 @@
|
||||
# {{ repository }}
|
||||
|
||||
Homepage: {{ homepage }}
|
||||
|
||||
## Author
|
||||
{{ author_name }} <{{ author_email }}>
|
||||
11
templates/default/flake.nix.j2
Normal file
11
templates/default/flake.nix.j2
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
description = "{{ repository }}";
|
||||
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
|
||||
outputs = { self, nixpkgs }:
|
||||
let system = "x86_64-linux"; pkgs = import nixpkgs { inherit system; };
|
||||
in {
|
||||
devShells.${system}.default = pkgs.mkShell {
|
||||
packages = with pkgs; [ python312 python312Packages.pytest python312Packages.ruff ];
|
||||
};
|
||||
};
|
||||
}
|
||||
21
templates/default/pyproject.toml.j2
Normal file
21
templates/default/pyproject.toml.j2
Normal file
@@ -0,0 +1,21 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=68", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "{{ repository }}"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10"
|
||||
authors = [{ name = "{{ author_name }}", email = "{{ author_email }}" }]
|
||||
license = { text = "{{ license_text }}" }
|
||||
urls = { Homepage = "{{ homepage }}" }
|
||||
|
||||
dependencies = []
|
||||
|
||||
[tool.setuptools]
|
||||
package-dir = {"" = "src"}
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
42
tests/e2e/test_repos_create_preview_output.py
Normal file
42
tests/e2e/test_repos_create_preview_output.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import unittest
|
||||
from contextlib import redirect_stdout
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.actions.repository.create import create_repo
|
||||
|
||||
|
||||
class TestE2ECreateRepoPreviewOutput(unittest.TestCase):
|
||||
def test_create_repo_preview_prints_expected_steps(self) -> None:
|
||||
cfg = {"directories": {"repositories": "/tmp/Repositories"}, "repositories": []}
|
||||
|
||||
out = io.StringIO()
|
||||
with (
|
||||
redirect_stdout(out),
|
||||
patch("pkgmgr.actions.repository.create.os.path.exists", return_value=False),
|
||||
patch("pkgmgr.actions.repository.create.generate_alias", return_value="repo"),
|
||||
patch("pkgmgr.actions.repository.create.save_user_config"),
|
||||
patch("pkgmgr.actions.repository.create.os.makedirs"),
|
||||
patch("pkgmgr.actions.repository.create.render_default_templates"),
|
||||
patch("pkgmgr.actions.repository.create.write_mirrors_file"),
|
||||
patch("pkgmgr.actions.repository.create.setup_mirrors"),
|
||||
patch("pkgmgr.actions.repository.create.subprocess.run"),
|
||||
):
|
||||
create_repo(
|
||||
"github.com/acme/repo",
|
||||
cfg,
|
||||
"/tmp/user.yml",
|
||||
"/tmp/bin",
|
||||
remote=False,
|
||||
preview=True,
|
||||
)
|
||||
|
||||
s = out.getvalue()
|
||||
self.assertIn("[Preview] Would save user config:", s)
|
||||
self.assertIn("[Preview] Would ensure directory exists:", s)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -96,6 +96,7 @@ class TestIntegrationUpdateAllshallowNoSystem(unittest.TestCase):
|
||||
"--clone-mode",
|
||||
"shallow",
|
||||
"--no-verification",
|
||||
"--silent",
|
||||
]
|
||||
self._run_cmd(["pkgmgr", *args], label="pkgmgr", env=env)
|
||||
pkgmgr_help_debug()
|
||||
@@ -110,6 +111,7 @@ class TestIntegrationUpdateAllshallowNoSystem(unittest.TestCase):
|
||||
"--clone-mode",
|
||||
"shallow",
|
||||
"--no-verification",
|
||||
"--silent",
|
||||
]
|
||||
self._run_cmd(
|
||||
["nix", "run", ".#pkgmgr", "--", *args],
|
||||
|
||||
66
tests/integration/test_release_publish_hook.py
Normal file
66
tests/integration/test_release_publish_hook.py
Normal file
@@ -0,0 +1,66 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import tempfile
|
||||
import unittest
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TestIntegrationReleasePublishHook(unittest.TestCase):
|
||||
def _ctx(self) -> SimpleNamespace:
|
||||
# Minimal CLIContext shape used by handle_release()
|
||||
return SimpleNamespace(
|
||||
repositories_base_dir="/tmp",
|
||||
all_repositories=[],
|
||||
)
|
||||
|
||||
def _parse(self, argv: list[str]):
|
||||
from pkgmgr.cli.parser import create_parser
|
||||
|
||||
parser = create_parser("pkgmgr test")
|
||||
return parser.parse_args(argv)
|
||||
|
||||
def test_release_runs_publish_by_default_and_respects_tty(self) -> None:
|
||||
from pkgmgr.cli.commands.release import handle_release
|
||||
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
selected = [{"directory": td}]
|
||||
|
||||
# Go through real parser to ensure CLI surface is wired correctly
|
||||
args = self._parse(["release", "patch"])
|
||||
|
||||
with patch("pkgmgr.cli.commands.release.run_release") as m_release, patch(
|
||||
"pkgmgr.cli.commands.release.run_publish"
|
||||
) as m_publish, patch(
|
||||
"pkgmgr.cli.commands.release.sys.stdin.isatty", return_value=False
|
||||
):
|
||||
handle_release(args=args, ctx=self._ctx(), selected=selected)
|
||||
|
||||
m_release.assert_called_once()
|
||||
m_publish.assert_called_once()
|
||||
|
||||
_, kwargs = m_publish.call_args
|
||||
self.assertEqual(kwargs["repo"], selected[0])
|
||||
self.assertEqual(kwargs["repo_dir"], td)
|
||||
self.assertFalse(kwargs["interactive"])
|
||||
self.assertFalse(kwargs["allow_prompt"])
|
||||
|
||||
def test_release_skips_publish_when_no_publish_flag_set(self) -> None:
|
||||
from pkgmgr.cli.commands.release import handle_release
|
||||
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
selected = [{"directory": td}]
|
||||
|
||||
args = self._parse(["release", "patch", "--no-publish"])
|
||||
|
||||
with patch("pkgmgr.cli.commands.release.run_release") as m_release, patch(
|
||||
"pkgmgr.cli.commands.release.run_publish"
|
||||
) as m_publish:
|
||||
handle_release(args=args, ctx=self._ctx(), selected=selected)
|
||||
|
||||
m_release.assert_called_once()
|
||||
m_publish.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
53
tests/integration/test_repos_create_preview.py
Normal file
53
tests/integration/test_repos_create_preview.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import io
|
||||
import unittest
|
||||
from contextlib import redirect_stdout
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TestIntegrationReposCreatePreview(unittest.TestCase):
|
||||
def test_repos_create_preview_wires_create_repo(self) -> None:
|
||||
# Import lazily to avoid hard-failing if the CLI module/function name differs.
|
||||
try:
|
||||
repos_mod = importlib.import_module("pkgmgr.cli.commands.repos")
|
||||
except Exception as exc:
|
||||
self.skipTest(f"CLI module not available: {exc}")
|
||||
|
||||
handle = getattr(repos_mod, "handle_repos_command", None)
|
||||
if handle is None:
|
||||
self.skipTest("handle_repos_command not found in pkgmgr.cli.commands.repos")
|
||||
|
||||
ctx = SimpleNamespace(
|
||||
repositories_base_dir="/tmp/Repositories",
|
||||
binaries_dir="/tmp/bin",
|
||||
all_repositories=[],
|
||||
config_merged={"directories": {"repositories": "/tmp/Repositories"}, "repositories": []},
|
||||
user_config_path="/tmp/user.yml",
|
||||
)
|
||||
|
||||
args = SimpleNamespace(
|
||||
command="create",
|
||||
identifiers=["github.com/acme/repo"],
|
||||
remote=False,
|
||||
preview=True,
|
||||
)
|
||||
|
||||
out = io.StringIO()
|
||||
with (
|
||||
redirect_stdout(out),
|
||||
patch("pkgmgr.cli.commands.repos.create_repo") as create_repo,
|
||||
):
|
||||
handle(args, ctx, selected=[])
|
||||
|
||||
create_repo.assert_called_once()
|
||||
called = create_repo.call_args.kwargs
|
||||
self.assertEqual(called["remote"], False)
|
||||
self.assertEqual(called["preview"], True)
|
||||
self.assertEqual(create_repo.call_args.args[0], "github.com/acme/repo")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
110
tests/integration/test_update_silent_continues.py
Normal file
110
tests/integration/test_update_silent_continues.py
Normal file
@@ -0,0 +1,110 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.actions.update.manager import UpdateManager
|
||||
|
||||
|
||||
class TestUpdateSilentContinues(unittest.TestCase):
|
||||
def test_update_continues_on_failures_and_silent_controls_exit_code(self) -> None:
|
||||
"""
|
||||
Integration test for UpdateManager:
|
||||
- pull failure on repo A should not stop repo B/C
|
||||
- install failure on repo B should not stop repo C
|
||||
- without silent -> SystemExit(1) at end if any failures
|
||||
- with silent -> no SystemExit even if there are failures
|
||||
"""
|
||||
|
||||
repos = [
|
||||
{"provider": "github", "account": "example", "repository": "repo-a"},
|
||||
{"provider": "github", "account": "example", "repository": "repo-b"},
|
||||
{"provider": "github", "account": "example", "repository": "repo-c"},
|
||||
]
|
||||
|
||||
# We patch the internal calls used by UpdateManager:
|
||||
# - pull_with_verification is called once per repo
|
||||
# - install_repos is called once per repo that successfully pulled
|
||||
#
|
||||
# We simulate:
|
||||
# repo-a: pull fails
|
||||
# repo-b: pull ok, install fails
|
||||
# repo-c: pull ok, install ok
|
||||
pull_calls = []
|
||||
install_calls = []
|
||||
|
||||
def pull_side_effect(selected_repos, *_args, **_kwargs):
|
||||
# selected_repos is a list with exactly one repo in our implementation.
|
||||
repo = selected_repos[0]
|
||||
pull_calls.append(repo["repository"])
|
||||
if repo["repository"] == "repo-a":
|
||||
raise SystemExit(2)
|
||||
return None
|
||||
|
||||
def install_side_effect(selected_repos, *_args, **kwargs):
|
||||
repo = selected_repos[0]
|
||||
install_calls.append((repo["repository"], kwargs.get("silent"), kwargs.get("emit_summary")))
|
||||
if repo["repository"] == "repo-b":
|
||||
raise SystemExit(3)
|
||||
return None
|
||||
|
||||
# Patch at the exact import locations used inside UpdateManager.run()
|
||||
with patch("pkgmgr.actions.repository.pull.pull_with_verification", side_effect=pull_side_effect), patch(
|
||||
"pkgmgr.actions.install.install_repos", side_effect=install_side_effect
|
||||
):
|
||||
# 1) silent=True: should NOT raise (even though failures happened)
|
||||
UpdateManager().run(
|
||||
selected_repos=repos,
|
||||
repositories_base_dir="/tmp/repos",
|
||||
bin_dir="/tmp/bin",
|
||||
all_repos=repos,
|
||||
no_verification=True,
|
||||
system_update=False,
|
||||
preview=True,
|
||||
quiet=True,
|
||||
update_dependencies=False,
|
||||
clone_mode="shallow",
|
||||
silent=True,
|
||||
force_update=True,
|
||||
)
|
||||
|
||||
# Ensure it tried all pulls, and installs happened for B and C only.
|
||||
self.assertEqual(pull_calls, ["repo-a", "repo-b", "repo-c"])
|
||||
self.assertEqual([r for r, _silent, _emit in install_calls], ["repo-b", "repo-c"])
|
||||
|
||||
# Ensure UpdateManager suppressed install summary spam by passing emit_summary=False.
|
||||
for _repo_name, _silent, emit_summary in install_calls:
|
||||
self.assertFalse(emit_summary)
|
||||
|
||||
# Reset tracking for the non-silent run
|
||||
pull_calls.clear()
|
||||
install_calls.clear()
|
||||
|
||||
# 2) silent=False: should raise SystemExit(1) at end due to failures
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
UpdateManager().run(
|
||||
selected_repos=repos,
|
||||
repositories_base_dir="/tmp/repos",
|
||||
bin_dir="/tmp/bin",
|
||||
all_repos=repos,
|
||||
no_verification=True,
|
||||
system_update=False,
|
||||
preview=True,
|
||||
quiet=True,
|
||||
update_dependencies=False,
|
||||
clone_mode="shallow",
|
||||
silent=False,
|
||||
force_update=True,
|
||||
)
|
||||
self.assertEqual(cm.exception.code, 1)
|
||||
|
||||
# Still must have processed all repos (continue-on-failure behavior).
|
||||
self.assertEqual(pull_calls, ["repo-a", "repo-b", "repo-c"])
|
||||
self.assertEqual([r for r, _silent, _emit in install_calls], ["repo-b", "repo-c"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,6 +1,3 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
@@ -9,117 +6,61 @@ from unittest.mock import patch
|
||||
from pkgmgr.actions.mirror.git_remote import (
|
||||
build_default_ssh_url,
|
||||
determine_primary_remote_url,
|
||||
current_origin_url,
|
||||
has_origin_remote,
|
||||
)
|
||||
from pkgmgr.actions.mirror.types import MirrorMap, Repository
|
||||
from pkgmgr.actions.mirror.types import RepoMirrorContext
|
||||
|
||||
|
||||
class TestMirrorGitRemote(unittest.TestCase):
|
||||
"""
|
||||
Unit tests for SSH URL and primary remote selection logic.
|
||||
"""
|
||||
def _ctx(self, *, file=None, config=None) -> RepoMirrorContext:
|
||||
return RepoMirrorContext(
|
||||
identifier="repo",
|
||||
repo_dir="/tmp/repo",
|
||||
config_mirrors=config or {},
|
||||
file_mirrors=file or {},
|
||||
)
|
||||
|
||||
def test_build_default_ssh_url_without_port(self) -> None:
|
||||
repo: Repository = {
|
||||
def test_build_default_ssh_url(self) -> None:
|
||||
repo = {
|
||||
"provider": "github.com",
|
||||
"account": "kevinveenbirkenbach",
|
||||
"repository": "package-manager",
|
||||
"account": "alice",
|
||||
"repository": "repo",
|
||||
}
|
||||
self.assertEqual(
|
||||
build_default_ssh_url(repo),
|
||||
"git@github.com:alice/repo.git",
|
||||
)
|
||||
|
||||
url = build_default_ssh_url(repo)
|
||||
self.assertEqual(url, "git@github.com:kevinveenbirkenbach/package-manager.git")
|
||||
def test_determine_primary_prefers_origin(self) -> None:
|
||||
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
|
||||
ctx = self._ctx(config={"origin": "git@github.com:alice/repo.git"})
|
||||
self.assertEqual(
|
||||
determine_primary_remote_url(repo, ctx),
|
||||
"git@github.com:alice/repo.git",
|
||||
)
|
||||
|
||||
def test_build_default_ssh_url_with_port(self) -> None:
|
||||
repo: Repository = {
|
||||
"provider": "code.cymais.cloud",
|
||||
"account": "kevinveenbirkenbach",
|
||||
"repository": "pkgmgr",
|
||||
"port": 2201,
|
||||
}
|
||||
def test_determine_primary_uses_file_order(self) -> None:
|
||||
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
|
||||
ctx = self._ctx(
|
||||
file={
|
||||
"first": "git@a/first.git",
|
||||
"second": "git@a/second.git",
|
||||
}
|
||||
)
|
||||
self.assertEqual(
|
||||
determine_primary_remote_url(repo, ctx),
|
||||
"git@a/first.git",
|
||||
)
|
||||
|
||||
url = build_default_ssh_url(repo)
|
||||
self.assertEqual(url, "ssh://git@code.cymais.cloud:2201/kevinveenbirkenbach/pkgmgr.git")
|
||||
def test_determine_primary_fallback_default(self) -> None:
|
||||
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
|
||||
ctx = self._ctx()
|
||||
self.assertEqual(
|
||||
determine_primary_remote_url(repo, ctx),
|
||||
"git@github.com:alice/repo.git",
|
||||
)
|
||||
|
||||
def test_build_default_ssh_url_missing_fields_returns_none(self) -> None:
|
||||
repo: Repository = {
|
||||
"provider": "github.com",
|
||||
"account": "kevinveenbirkenbach",
|
||||
}
|
||||
|
||||
url = build_default_ssh_url(repo)
|
||||
self.assertIsNone(url)
|
||||
|
||||
def test_determine_primary_remote_url_prefers_origin_in_resolved_mirrors(self) -> None:
|
||||
repo: Repository = {
|
||||
"provider": "github.com",
|
||||
"account": "kevinveenbirkenbach",
|
||||
"repository": "package-manager",
|
||||
}
|
||||
mirrors: MirrorMap = {
|
||||
"origin": "git@github.com:kevinveenbirkenbach/package-manager.git",
|
||||
"backup": "ssh://git@git.veen.world:2201/kevinveenbirkenbach/pkgmgr.git",
|
||||
}
|
||||
|
||||
url = determine_primary_remote_url(repo, mirrors)
|
||||
self.assertEqual(url, "git@github.com:kevinveenbirkenbach/package-manager.git")
|
||||
|
||||
def test_determine_primary_remote_url_uses_any_mirror_if_no_origin(self) -> None:
|
||||
repo: Repository = {
|
||||
"provider": "github.com",
|
||||
"account": "kevinveenbirkenbach",
|
||||
"repository": "package-manager",
|
||||
}
|
||||
mirrors: MirrorMap = {
|
||||
"backup": "ssh://git@git.veen.world:2201/kevinveenbirkenbach/pkgmgr.git",
|
||||
"mirror2": "ssh://git@code.cymais.cloud:2201/kevinveenbirkenbach/pkgmgr.git",
|
||||
}
|
||||
|
||||
url = determine_primary_remote_url(repo, mirrors)
|
||||
self.assertEqual(url, "ssh://git@git.veen.world:2201/kevinveenbirkenbach/pkgmgr.git")
|
||||
|
||||
def test_determine_primary_remote_url_falls_back_to_default_ssh(self) -> None:
|
||||
repo: Repository = {
|
||||
"provider": "github.com",
|
||||
"account": "kevinveenbirkenbach",
|
||||
"repository": "package-manager",
|
||||
}
|
||||
mirrors: MirrorMap = {}
|
||||
|
||||
url = determine_primary_remote_url(repo, mirrors)
|
||||
self.assertEqual(url, "git@github.com:kevinveenbirkenbach/package-manager.git")
|
||||
|
||||
@patch("pkgmgr.actions.mirror.git_remote.run_git")
|
||||
def test_current_origin_url_returns_value(self, mock_run_git) -> None:
|
||||
mock_run_git.return_value = "git@github.com:alice/repo.git\n"
|
||||
self.assertEqual(current_origin_url("/tmp/repo"), "git@github.com:alice/repo.git")
|
||||
mock_run_git.assert_called_once_with(["remote", "get-url", "origin"], cwd="/tmp/repo")
|
||||
|
||||
@patch("pkgmgr.actions.mirror.git_remote.run_git")
|
||||
def test_current_origin_url_returns_none_on_git_error(self, mock_run_git) -> None:
|
||||
from pkgmgr.core.git import GitError
|
||||
|
||||
mock_run_git.side_effect = GitError("fail")
|
||||
self.assertIsNone(current_origin_url("/tmp/repo"))
|
||||
|
||||
@patch("pkgmgr.actions.mirror.git_remote.run_git")
|
||||
def test_has_origin_remote_true(self, mock_run_git) -> None:
|
||||
mock_run_git.return_value = "origin\nupstream\n"
|
||||
@patch("pkgmgr.actions.mirror.git_remote._safe_git_output")
|
||||
def test_has_origin_remote(self, m_out) -> None:
|
||||
m_out.return_value = "origin\nupstream\n"
|
||||
self.assertTrue(has_origin_remote("/tmp/repo"))
|
||||
mock_run_git.assert_called_once_with(["remote"], cwd="/tmp/repo")
|
||||
|
||||
@patch("pkgmgr.actions.mirror.git_remote.run_git")
|
||||
def test_has_origin_remote_false_on_missing_remote(self, mock_run_git) -> None:
|
||||
mock_run_git.return_value = "upstream\n"
|
||||
self.assertFalse(has_origin_remote("/tmp/repo"))
|
||||
|
||||
@patch("pkgmgr.actions.mirror.git_remote.run_git")
|
||||
def test_has_origin_remote_false_on_git_error(self, mock_run_git) -> None:
|
||||
from pkgmgr.core.git import GitError
|
||||
|
||||
mock_run_git.side_effect = GitError("fail")
|
||||
self.assertFalse(has_origin_remote("/tmp/repo"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.actions.mirror.git_remote import ensure_origin_remote
|
||||
from pkgmgr.actions.mirror.types import RepoMirrorContext
|
||||
|
||||
|
||||
class TestGitRemotePrimaryPush(unittest.TestCase):
|
||||
def test_origin_created_and_extra_push_added(self) -> None:
|
||||
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
|
||||
ctx = RepoMirrorContext(
|
||||
identifier="repo",
|
||||
repo_dir="/tmp/repo",
|
||||
config_mirrors={},
|
||||
file_mirrors={
|
||||
"primary": "git@github.com:alice/repo.git",
|
||||
"backup": "git@github.com:alice/repo-backup.git",
|
||||
},
|
||||
)
|
||||
|
||||
executed: list[str] = []
|
||||
|
||||
def fake_run(cmd: str, cwd: str, preview: bool) -> None:
|
||||
executed.append(cmd)
|
||||
|
||||
def fake_git(args, cwd):
|
||||
if args == ["remote"]:
|
||||
return ""
|
||||
if args == ["remote", "get-url", "--push", "--all", "origin"]:
|
||||
return "git@github.com:alice/repo.git\n"
|
||||
return ""
|
||||
|
||||
with patch("os.path.isdir", return_value=True), patch(
|
||||
"pkgmgr.actions.mirror.git_remote.run_command", side_effect=fake_run
|
||||
), patch(
|
||||
"pkgmgr.actions.mirror.git_remote._safe_git_output", side_effect=fake_git
|
||||
):
|
||||
ensure_origin_remote(repo, ctx, preview=False)
|
||||
|
||||
self.assertEqual(
|
||||
executed,
|
||||
[
|
||||
"git remote add origin git@github.com:alice/repo.git",
|
||||
"git remote set-url origin git@github.com:alice/repo.git",
|
||||
"git remote set-url --push origin git@github.com:alice/repo.git",
|
||||
"git remote set-url --add --push origin git@github.com:alice/repo-backup.git",
|
||||
],
|
||||
)
|
||||
@@ -1,123 +1,101 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, PropertyMock, patch
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.actions.mirror.setup_cmd import setup_mirrors
|
||||
from pkgmgr.actions.mirror.types import RepoMirrorContext
|
||||
|
||||
|
||||
class TestMirrorSetupCmd(unittest.TestCase):
|
||||
"""
|
||||
Unit tests for mirror setup orchestration (local + remote).
|
||||
"""
|
||||
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_origin_remote")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
|
||||
def test_setup_mirrors_local_calls_ensure_origin_remote(
|
||||
def _ctx(
|
||||
self,
|
||||
mock_build_context,
|
||||
mock_ensure_origin,
|
||||
) -> None:
|
||||
ctx = MagicMock()
|
||||
ctx.identifier = "repo-id"
|
||||
ctx.repo_dir = "/tmp/repo"
|
||||
ctx.config_mirrors = {}
|
||||
ctx.file_mirrors = {}
|
||||
type(ctx).resolved_mirrors = PropertyMock(return_value={})
|
||||
mock_build_context.return_value = ctx
|
||||
*,
|
||||
repo_dir: str = "/tmp/repo",
|
||||
resolved: dict[str, str] | None = None,
|
||||
) -> RepoMirrorContext:
|
||||
# RepoMirrorContext derives resolved via property (config + file)
|
||||
# We feed mirrors via file_mirrors to keep insertion order realistic.
|
||||
return RepoMirrorContext(
|
||||
identifier="repo-id",
|
||||
repo_dir=repo_dir,
|
||||
config_mirrors={},
|
||||
file_mirrors=resolved or {},
|
||||
)
|
||||
|
||||
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_origin_remote")
|
||||
def test_setup_mirrors_local_calls_ensure_origin_remote(self, m_ensure, m_ctx) -> None:
|
||||
m_ctx.return_value = self._ctx(repo_dir="/tmp/repo", resolved={"primary": "git@x/y.git"})
|
||||
|
||||
repos = [{"provider": "github.com", "account": "alice", "repository": "repo"}]
|
||||
setup_mirrors(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir="/base",
|
||||
all_repos=[repo],
|
||||
selected_repos=repos,
|
||||
repositories_base_dir="/tmp",
|
||||
all_repos=repos,
|
||||
preview=True,
|
||||
local=True,
|
||||
remote=False,
|
||||
ensure_remote=False,
|
||||
)
|
||||
|
||||
mock_ensure_origin.assert_called_once()
|
||||
args, kwargs = mock_ensure_origin.call_args
|
||||
self.assertEqual(args[0], repo)
|
||||
self.assertEqual(kwargs.get("preview"), True)
|
||||
self.assertEqual(m_ensure.call_count, 1)
|
||||
args, kwargs = m_ensure.call_args
|
||||
|
||||
# ensure_origin_remote(repo, ctx, preview) may be positional or kw.
|
||||
# Accept both to avoid coupling tests to call style.
|
||||
if "preview" in kwargs:
|
||||
self.assertTrue(kwargs["preview"])
|
||||
else:
|
||||
# args: (repo, ctx, preview)
|
||||
self.assertTrue(args[2])
|
||||
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_remote_repository")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.probe_mirror")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
|
||||
def test_setup_mirrors_remote_provisions_when_enabled(
|
||||
self,
|
||||
mock_build_context,
|
||||
mock_probe,
|
||||
mock_ensure_remote_repository,
|
||||
) -> None:
|
||||
ctx = MagicMock()
|
||||
ctx.identifier = "repo-id"
|
||||
ctx.repo_dir = "/tmp/repo"
|
||||
ctx.config_mirrors = {"origin": "git@github.com:alice/repo.git"}
|
||||
ctx.file_mirrors = {}
|
||||
type(ctx).resolved_mirrors = PropertyMock(return_value={"origin": "git@github.com:alice/repo.git"})
|
||||
mock_build_context.return_value = ctx
|
||||
|
||||
mock_probe.return_value = (True, "")
|
||||
|
||||
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
|
||||
|
||||
setup_mirrors(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir="/base",
|
||||
all_repos=[repo],
|
||||
preview=False,
|
||||
local=False,
|
||||
remote=True,
|
||||
ensure_remote=True,
|
||||
)
|
||||
|
||||
mock_ensure_remote_repository.assert_called_once()
|
||||
mock_probe.assert_called_once()
|
||||
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.ensure_remote_repository")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.probe_mirror")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
|
||||
def test_setup_mirrors_remote_probes_all_resolved_mirrors(
|
||||
self,
|
||||
mock_build_context,
|
||||
mock_probe,
|
||||
mock_ensure_remote_repository,
|
||||
) -> None:
|
||||
ctx = MagicMock()
|
||||
ctx.identifier = "repo-id"
|
||||
ctx.repo_dir = "/tmp/repo"
|
||||
ctx.config_mirrors = {}
|
||||
ctx.file_mirrors = {}
|
||||
type(ctx).resolved_mirrors = PropertyMock(
|
||||
return_value={
|
||||
"mirror": "git@github.com:alice/repo.git",
|
||||
"backup": "ssh://git@git.veen.world:2201/alice/repo.git",
|
||||
}
|
||||
)
|
||||
mock_build_context.return_value = ctx
|
||||
|
||||
mock_probe.return_value = (True, "")
|
||||
|
||||
repo = {"provider": "github.com", "account": "alice", "repository": "repo"}
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.determine_primary_remote_url")
|
||||
def test_setup_mirrors_remote_no_mirrors_probes_primary(self, m_primary, m_probe, m_ctx) -> None:
|
||||
m_ctx.return_value = self._ctx(repo_dir="/tmp/repo", resolved={})
|
||||
m_primary.return_value = "git@github.com:alice/repo.git"
|
||||
m_probe.return_value = (True, "")
|
||||
|
||||
repos = [{"provider": "github.com", "account": "alice", "repository": "repo"}]
|
||||
setup_mirrors(
|
||||
selected_repos=[repo],
|
||||
repositories_base_dir="/base",
|
||||
all_repos=[repo],
|
||||
preview=False,
|
||||
selected_repos=repos,
|
||||
repositories_base_dir="/tmp",
|
||||
all_repos=repos,
|
||||
preview=True,
|
||||
local=False,
|
||||
remote=True,
|
||||
ensure_remote=False,
|
||||
)
|
||||
|
||||
mock_ensure_remote_repository.assert_not_called()
|
||||
self.assertEqual(mock_probe.call_count, 2)
|
||||
m_primary.assert_called()
|
||||
m_probe.assert_called_with("git@github.com:alice/repo.git", "/tmp/repo")
|
||||
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.build_context")
|
||||
@patch("pkgmgr.actions.mirror.setup_cmd.probe_mirror")
|
||||
def test_setup_mirrors_remote_with_mirrors_probes_each(self, m_probe, m_ctx) -> None:
|
||||
m_ctx.return_value = self._ctx(
|
||||
repo_dir="/tmp/repo",
|
||||
resolved={
|
||||
"origin": "git@github.com:alice/repo.git",
|
||||
"backup": "ssh://git@git.veen.world:2201/alice/repo.git",
|
||||
},
|
||||
)
|
||||
m_probe.return_value = (True, "")
|
||||
|
||||
repos = [{"provider": "github.com", "account": "alice", "repository": "repo"}]
|
||||
setup_mirrors(
|
||||
selected_repos=repos,
|
||||
repositories_base_dir="/tmp",
|
||||
all_repos=repos,
|
||||
preview=True,
|
||||
local=False,
|
||||
remote=True,
|
||||
ensure_remote=False,
|
||||
)
|
||||
|
||||
self.assertEqual(m_probe.call_count, 2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
62
tests/unit/pkgmgr/actions/repository/test_create_parsing.py
Normal file
62
tests/unit/pkgmgr/actions/repository/test_create_parsing.py
Normal file
@@ -0,0 +1,62 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
|
||||
from pkgmgr.actions.repository.create import (
|
||||
RepoParts,
|
||||
_parse_identifier,
|
||||
_parse_git_url,
|
||||
_strip_git_suffix,
|
||||
_split_host_port,
|
||||
)
|
||||
|
||||
|
||||
class TestRepositoryCreateParsing(unittest.TestCase):
|
||||
def test_strip_git_suffix(self) -> None:
|
||||
self.assertEqual(_strip_git_suffix("repo.git"), "repo")
|
||||
self.assertEqual(_strip_git_suffix("repo"), "repo")
|
||||
|
||||
def test_split_host_port(self) -> None:
|
||||
self.assertEqual(_split_host_port("example.com"), ("example.com", None))
|
||||
self.assertEqual(_split_host_port("example.com:2222"), ("example.com", "2222"))
|
||||
self.assertEqual(_split_host_port("example.com:"), ("example.com", None))
|
||||
|
||||
def test_parse_identifier_plain(self) -> None:
|
||||
parts = _parse_identifier("github.com/owner/repo")
|
||||
self.assertIsInstance(parts, RepoParts)
|
||||
self.assertEqual(parts.host, "github.com")
|
||||
self.assertEqual(parts.port, None)
|
||||
self.assertEqual(parts.owner, "owner")
|
||||
self.assertEqual(parts.name, "repo")
|
||||
|
||||
def test_parse_identifier_with_port(self) -> None:
|
||||
parts = _parse_identifier("gitea.example.com:2222/org/repo")
|
||||
self.assertEqual(parts.host, "gitea.example.com")
|
||||
self.assertEqual(parts.port, "2222")
|
||||
self.assertEqual(parts.owner, "org")
|
||||
self.assertEqual(parts.name, "repo")
|
||||
|
||||
def test_parse_git_url_scp_style(self) -> None:
|
||||
parts = _parse_git_url("git@github.com:owner/repo.git")
|
||||
self.assertEqual(parts.host, "github.com")
|
||||
self.assertEqual(parts.port, None)
|
||||
self.assertEqual(parts.owner, "owner")
|
||||
self.assertEqual(parts.name, "repo")
|
||||
|
||||
def test_parse_git_url_https(self) -> None:
|
||||
parts = _parse_git_url("https://github.com/owner/repo.git")
|
||||
self.assertEqual(parts.host, "github.com")
|
||||
self.assertEqual(parts.port, None)
|
||||
self.assertEqual(parts.owner, "owner")
|
||||
self.assertEqual(parts.name, "repo")
|
||||
|
||||
def test_parse_git_url_ssh_with_port(self) -> None:
|
||||
parts = _parse_git_url("ssh://git@gitea.example.com:2222/org/repo.git")
|
||||
self.assertEqual(parts.host, "gitea.example.com")
|
||||
self.assertEqual(parts.port, "2222")
|
||||
self.assertEqual(parts.owner, "org")
|
||||
self.assertEqual(parts.name, "repo")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,35 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.actions.repository.scaffold import render_default_templates
|
||||
|
||||
|
||||
class TestScaffoldRenderPreview(unittest.TestCase):
|
||||
def test_render_preview_does_not_write(self) -> None:
|
||||
with (
|
||||
patch("pkgmgr.actions.repository.scaffold._templates_dir", return_value="/tpl"),
|
||||
patch("pkgmgr.actions.repository.scaffold.os.path.isdir", return_value=True),
|
||||
patch("pkgmgr.actions.repository.scaffold.os.walk", return_value=[("/tpl", [], ["README.md.j2"])]),
|
||||
patch("pkgmgr.actions.repository.scaffold.os.path.relpath", return_value="README.md.j2"),
|
||||
patch("pkgmgr.actions.repository.scaffold.os.makedirs") as mk,
|
||||
patch("pkgmgr.actions.repository.scaffold.open", create=True) as op,
|
||||
patch("pkgmgr.actions.repository.scaffold.Environment") as env_cls,
|
||||
):
|
||||
env = env_cls.return_value
|
||||
env.get_template.return_value.render.return_value = "X"
|
||||
|
||||
render_default_templates(
|
||||
"/repo",
|
||||
context={"repository": "x"},
|
||||
preview=True,
|
||||
)
|
||||
|
||||
mk.assert_not_called()
|
||||
op.assert_not_called()
|
||||
env.get_template.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
75
tests/unit/pkgmgr/cli/commands/test_release_publish_hook.py
Normal file
75
tests/unit/pkgmgr/cli/commands/test_release_publish_hook.py
Normal file
@@ -0,0 +1,75 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import tempfile
|
||||
import unittest
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TestCLIReleasePublishHook(unittest.TestCase):
|
||||
def _ctx(self) -> SimpleNamespace:
|
||||
# Minimal CLIContext shape used by handle_release
|
||||
return SimpleNamespace(
|
||||
repositories_base_dir="/tmp",
|
||||
all_repositories=[],
|
||||
)
|
||||
|
||||
def test_release_runs_publish_by_default_and_respects_tty(self) -> None:
|
||||
from pkgmgr.cli.commands.release import handle_release
|
||||
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
repo = {"directory": td}
|
||||
|
||||
args = SimpleNamespace(
|
||||
list=False,
|
||||
release_type="patch",
|
||||
message=None,
|
||||
preview=False,
|
||||
force=False,
|
||||
close=False,
|
||||
no_publish=False,
|
||||
)
|
||||
|
||||
with patch("pkgmgr.cli.commands.release.run_release") as m_release, patch(
|
||||
"pkgmgr.cli.commands.release.run_publish"
|
||||
) as m_publish, patch(
|
||||
"pkgmgr.cli.commands.release.sys.stdin.isatty", return_value=False
|
||||
):
|
||||
handle_release(args=args, ctx=self._ctx(), selected=[repo])
|
||||
|
||||
m_release.assert_called_once()
|
||||
m_publish.assert_called_once()
|
||||
|
||||
_, kwargs = m_publish.call_args
|
||||
self.assertEqual(kwargs["repo"], repo)
|
||||
self.assertEqual(kwargs["repo_dir"], td)
|
||||
self.assertFalse(kwargs["interactive"])
|
||||
self.assertFalse(kwargs["allow_prompt"])
|
||||
|
||||
def test_release_skips_publish_when_no_publish_flag_set(self) -> None:
|
||||
from pkgmgr.cli.commands.release import handle_release
|
||||
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
repo = {"directory": td}
|
||||
|
||||
args = SimpleNamespace(
|
||||
list=False,
|
||||
release_type="patch",
|
||||
message=None,
|
||||
preview=False,
|
||||
force=False,
|
||||
close=False,
|
||||
no_publish=True,
|
||||
)
|
||||
|
||||
with patch("pkgmgr.cli.commands.release.run_release") as m_release, patch(
|
||||
"pkgmgr.cli.commands.release.run_publish"
|
||||
) as m_publish:
|
||||
handle_release(args=args, ctx=self._ctx(), selected=[repo])
|
||||
|
||||
m_release.assert_called_once()
|
||||
m_publish.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user