Compare commits

..

14 Commits

Author SHA1 Message Date
Kevin Veen-Birkenbach
be70dd4239 Release version 1.8.1
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-16 18:06:35 +01:00
Kevin Veen-Birkenbach
74876e2e15 Fixed ruff
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-16 18:00:56 +01:00
Kevin Veen-Birkenbach
54058c7f4d gpt-5.2 ChatGPT: integrate gh-based credential resolution with full integration test
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Add GhTokenProvider to read GitHub tokens via `gh auth token`
- Extend TokenResolver policy: ENV → gh → keyring (validate) → prompt (overwrite)
- Introduce provider-specific token validation for GitHub
- Ensure invalid keyring tokens trigger interactive re-prompt and overwrite
- Add end-to-end integration test covering gh → keyring → prompt flow
- Clean up credentials package exports and documentation

https://chatgpt.com/share/69418c81-6748-800f-8fec-616684746e3c
2025-12-16 17:44:44 +01:00
Kevin Veen-Birkenbach
8583fdf172 feat(mirror,create): make MIRRORS single source of truth and exclude PyPI from git config
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Treat MIRRORS as the only authority for mirror URLs
- Filter non-git URLs (e.g. PyPI) from git remotes and push URLs
- Prefer SSH git URLs when determining primary origin
- Ensure mirror probing only targets valid git remotes
- Refactor repository create into service-based architecture
- Write PyPI metadata exclusively to MIRRORS, never to git config
- Add integration test verifying PyPI is not written into .git/config
- Update preview and unit tests to match new create flow

https://chatgpt.com/share/69415c61-1c5c-800f-86dd-0405edec25db
2025-12-16 14:19:19 +01:00
Kevin Veen-Birkenbach
374f4ed745 test(integration): move create repo preview test from e2e and mock git commands
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Reclassify create repo preview test as integration test
- Rename test class to drop E2E naming
- Replace subprocess mock with core.git command mocks (init/add_all/commit)
- Patch get_config_value to avoid git config dependency

https://chatgpt.com/share/694150de-873c-800f-a01d-df3cc7ce25df
2025-12-16 13:30:19 +01:00
Kevin Veen-Birkenbach
63e1b3d145 core.git: add get_repo_root query and use it in repository scaffold
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
https://chatgpt.com/share/69414f70-fc60-800f-ba6a-cbea426ea913
2025-12-16 13:23:36 +01:00
Kevin Veen-Birkenbach
2f89de1ff5 refactor(pull): switch repository pull to core.git commands
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Replace raw subprocess git pull with core.git.commands.pull_args
- Remove shell-based command execution
- Add GitPullArgsError wrapper for consistent error handling
- Align unit tests to mock pull_args instead of subprocess.run
- Preserve verification and prompt logic

https://chatgpt.com/share/69414dc9-5b30-800f-88b2-bd27a873580b
2025-12-16 13:17:04 +01:00
Kevin Veen-Birkenbach
019aa4b0d9 refactor(git): migrate repository creation to core.git commands
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Replace direct subprocess git calls with core.git commands (init, add_all, commit, branch_move, push_upstream)
- Introduce add_all, init, and branch_move command wrappers with preview support
- Use git config queries via get_config_value instead of shell access
- Preserve main → master fallback logic with explicit error handling
- Improve error transparency while keeping previous non-fatal behavior

https://chatgpt.com/share/69414b77-b4d4-800f-a189-463b489664b3
2025-12-16 13:05:42 +01:00
Kevin Veen-Birkenbach
9c22c7dbb4 refactor(git): introduce structured core.git command/query API and adapt actions & tests
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Replace direct subprocess usage with core.git.run wrapper
- Introduce dedicated core.git.commands (add, commit, fetch, pull_ff_only, push, clone, tag_annotated, tag_force_annotated, etc.)
- Introduce core.git.queries (list_tags, get_upstream_ref, get_config_value, changelog helpers, etc.)
- Refactor release workflow and git_ops to use command/query split
- Implement semantic vX.Y.Z comparison with safe fallback for non-parseable tags
- Refactor repository clone logic to use core.git.commands.clone with preview support and ssh→https fallback
- Remove legacy run_git_command helpers
- Split and update unit tests to mock command/query boundaries instead of subprocess
- Add comprehensive tests for clone modes, preview behavior, ssh→https fallback, and verification prompts
- Add unit tests for core.git.run error handling and preview mode
- Align public exports (__all__) with new structure
- Improve type hints, docstrings, and error specificity across git helpers

https://chatgpt.com/share/69414735-51d4-800f-bc7b-4b90e35f71e5
2025-12-16 12:49:03 +01:00
Kevin Veen-Birkenbach
f83e192e37 refactor(release/git): replace shell git calls with command/query helpers
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Remove legacy shell-based git helpers from release workflow
- Introduce typed git command wrappers (add, commit, fetch, pull_ff_only, push, tag*)
- Add git queries for upstream detection and tag listing
- Refactor release workflow to use core git commands consistently
- Implement semantic vX.Y.Z tag comparison without external sort
- Ensure prerelease tags (e.g. -rc) do not outrank final releases
- Split and update unit tests to match new command/query architecture
2025-12-16 12:30:36 +01:00
Kevin Veen-Birkenbach
486863eb58 Sovled ruff linting hints
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-16 12:04:16 +01:00
Kevin Veen-Birkenbach
bb23bd94f2 refactor(git): add get_latest_commit query and remove subprocess usage
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Introduce core.git query get_latest_commit()
- Refactor config init to use git query instead of subprocess
- Fix __future__ import order in core.git package
- Export new query via core.git.queries API

https://chatgpt.com/share/69413c3e-3bcc-800f-b3b0-a3bf3b7bb875
2025-12-16 12:02:09 +01:00
Kevin Veen-Birkenbach
2a66c082eb gpt-5.2 ChatGPT: move git config lookup into core.git query
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Replace inline `git config --get` subprocess usage in release/files.py
  with core.git.queries.get_config_value()
- Keep core.git.run() strict; interpret exit code 1 for missing config keys
  at the query layer
- Export get_config_value via core.git.queries

https://chatgpt.com/share/69413aef-9814-800f-a9c3-e98666a4204a
2025-12-16 11:56:24 +01:00
Kevin Veen-Birkenbach
ee9d7758ed Solved ruff linting hints
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
2025-12-16 11:42:40 +01:00
69 changed files with 2361 additions and 1017 deletions

View File

@@ -1,3 +1,11 @@
## [1.8.1] - 2025-12-16
* * Improved stability and consistency of all Git operations (clone, pull, push, release, branch handling) with clearer error messages and predictable preview behavior.
* Mirrors are now handled cleanly: only valid Git remotes are used for Git operations, while non-Git URLs (e.g. PyPI) are excluded, preventing broken or confusing repository configs.
* GitHub authentication is more robust: tokens are automatically resolved via the GitHub CLI (`gh`), invalid stored tokens are replaced, and interactive prompts occur only when necessary.
* Repository creation and release workflows are more reliable, producing cleaner Git configurations and more predictable version handling.
## [1.8.0] - 2025-12-15 ## [1.8.0] - 2025-12-15
* *** New Features: *** * *** New Features: ***

View File

@@ -32,7 +32,7 @@
rec { rec {
pkgmgr = pyPkgs.buildPythonApplication { pkgmgr = pyPkgs.buildPythonApplication {
pname = "package-manager"; pname = "package-manager";
version = "1.8.0"; version = "1.8.1";
# Use the git repo as source # Use the git repo as source
src = ./.; src = ./.;

View File

@@ -1,7 +1,7 @@
# Maintainer: Kevin Veen-Birkenbach <info@veen.world> # Maintainer: Kevin Veen-Birkenbach <info@veen.world>
pkgname=package-manager pkgname=package-manager
pkgver=1.8.0 pkgver=1.8.1
pkgrel=1 pkgrel=1
pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)." pkgdesc="Local-flake wrapper for Kevin's package-manager (Nix-based)."
arch=('any') arch=('any')

View File

@@ -1,3 +1,12 @@
package-manager (1.8.1-1) unstable; urgency=medium
* * Improved stability and consistency of all Git operations (clone, pull, push, release, branch handling) with clearer error messages and predictable preview behavior.
* Mirrors are now handled cleanly: only valid Git remotes are used for Git operations, while non-Git URLs (e.g. PyPI) are excluded, preventing broken or confusing repository configs.
* GitHub authentication is more robust: tokens are automatically resolved via the GitHub CLI (`gh`), invalid stored tokens are replaced, and interactive prompts occur only when necessary.
* Repository creation and release workflows are more reliable, producing cleaner Git configurations and more predictable version handling.
-- Kevin Veen-Birkenbach <kevin@veen.world> Tue, 16 Dec 2025 18:06:35 +0100
package-manager (1.8.0-1) unstable; urgency=medium package-manager (1.8.0-1) unstable; urgency=medium
* *** New Features: *** * *** New Features: ***

View File

@@ -1,5 +1,5 @@
Name: package-manager Name: package-manager
Version: 1.8.0 Version: 1.8.1
Release: 1%{?dist} Release: 1%{?dist}
Summary: Wrapper that runs Kevin's package-manager via Nix flake Summary: Wrapper that runs Kevin's package-manager via Nix flake
@@ -74,6 +74,12 @@ echo ">>> package-manager removed. Nix itself was not removed."
/usr/lib/package-manager/ /usr/lib/package-manager/
%changelog %changelog
* Tue Dec 16 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.8.1-1
- * Improved stability and consistency of all Git operations (clone, pull, push, release, branch handling) with clearer error messages and predictable preview behavior.
* Mirrors are now handled cleanly: only valid Git remotes are used for Git operations, while non-Git URLs (e.g. PyPI) are excluded, preventing broken or confusing repository configs.
* GitHub authentication is more robust: tokens are automatically resolved via the GitHub CLI (`gh`), invalid stored tokens are replaced, and interactive prompts occur only when necessary.
* Repository creation and release workflows are more reliable, producing cleaner Git configurations and more predictable version handling.
* Mon Dec 15 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.8.0-1 * Mon Dec 15 2025 Kevin Veen-Birkenbach <kevin@veen.world> - 1.8.0-1
- *** New Features: *** - *** New Features: ***
- **Silent Updates**: You can now use the `--silent` flag during installs and updates to suppress error messages for individual repositories and get a single summary at the end. This ensures the process continues even if some repositories fail, while still preserving interactive checks when not in silent mode. - **Silent Updates**: You can now use the `--silent` flag during installs and updates to suppress error messages for individual repositories and get a single summary at the end. This ensures the process continues even if some repositories fail, while still preserving interactive checks when not in silent mode.

View File

@@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "kpmx" name = "kpmx"
version = "1.8.0" version = "1.8.1"
description = "Kevin's package-manager tool (pkgmgr)" description = "Kevin's package-manager tool (pkgmgr)"
readme = "README.md" readme = "README.md"
requires-python = ">=3.9" requires-python = ">=3.9"

View File

@@ -14,7 +14,7 @@ with the expected structure:
For each discovered repository, the function: For each discovered repository, the function:
• derives provider, account, repository from the folder structure • derives provider, account, repository from the folder structure
• (optionally) determines the latest commit hash via git log • (optionally) determines the latest commit hash via git
• generates a unique CLI alias • generates a unique CLI alias
• marks ignore=True for newly discovered repos • marks ignore=True for newly discovered repos
• skips repos already known in defaults or user config • skips repos already known in defaults or user config
@@ -23,11 +23,11 @@ For each discovered repository, the function:
from __future__ import annotations from __future__ import annotations
import os import os
import subprocess
from typing import Any, Dict from typing import Any, Dict
from pkgmgr.core.command.alias import generate_alias from pkgmgr.core.command.alias import generate_alias
from pkgmgr.core.config.save import save_user_config from pkgmgr.core.config.save import save_user_config
from pkgmgr.core.git.queries import get_latest_commit
def config_init( def config_init(
@@ -116,27 +116,18 @@ def config_init(
print(f"[ADD] {provider}/{account}/{repo_name}") print(f"[ADD] {provider}/{account}/{repo_name}")
# Determine commit hash # Determine commit hash via git query
try: verified_commit = get_latest_commit(repo_path) or ""
result = subprocess.run( if verified_commit:
["git", "log", "-1", "--format=%H"], print(f"[INFO] Latest commit: {verified_commit}")
cwd=repo_path, else:
stdout=subprocess.PIPE, print("[WARN] Could not read commit (not a git repo or no commits).")
stderr=subprocess.PIPE,
text=True,
check=True,
)
verified = result.stdout.strip()
print(f"[INFO] Latest commit: {verified}")
except Exception as exc:
verified = ""
print(f"[WARN] Could not read commit: {exc}")
entry = { entry: Dict[str, Any] = {
"provider": provider, "provider": provider,
"account": account, "account": account,
"repository": repo_name, "repository": repo_name,
"verified": {"commit": verified}, "verified": {"commit": verified_commit},
"ignore": True, "ignore": True,
} }

View File

@@ -12,14 +12,38 @@ from pkgmgr.core.git.commands import (
add_remote_push_url, add_remote_push_url,
set_remote_url, set_remote_url,
) )
from pkgmgr.core.git.queries import ( from pkgmgr.core.git.queries import get_remote_push_urls, list_remotes
get_remote_push_urls,
list_remotes,
)
from .types import MirrorMap, RepoMirrorContext, Repository from .types import MirrorMap, RepoMirrorContext, Repository
def _is_git_remote_url(url: str) -> bool:
"""
True only for URLs that should become git remotes / push URLs.
Accepted:
- git@host:owner/repo(.git) (SCP-like SSH)
- ssh://git@host(:port)/owner/repo(.git) (SSH URL)
- https://host/owner/repo.git (HTTPS git remote)
- http://host/owner/repo.git (rare, but possible)
Everything else (e.g. PyPI project page) stays metadata only.
"""
u = (url or "").strip()
if not u:
return False
if u.startswith("git@"):
return True
if u.startswith("ssh://"):
return True
if (u.startswith("https://") or u.startswith("http://")) and u.endswith(".git"):
return True
return False
def build_default_ssh_url(repo: Repository) -> Optional[str]: def build_default_ssh_url(repo: Repository) -> Optional[str]:
provider = repo.get("provider") provider = repo.get("provider")
account = repo.get("account") account = repo.get("account")
@@ -35,25 +59,29 @@ def build_default_ssh_url(repo: Repository) -> Optional[str]:
return f"git@{provider}:{account}/{name}.git" return f"git@{provider}:{account}/{name}.git"
def _git_mirrors_only(m: MirrorMap) -> MirrorMap:
return {k: v for k, v in m.items() if v and _is_git_remote_url(v)}
def determine_primary_remote_url( def determine_primary_remote_url(
repo: Repository, repo: Repository,
ctx: RepoMirrorContext, ctx: RepoMirrorContext,
) -> Optional[str]: ) -> Optional[str]:
""" """
Priority order: Priority order (GIT URLS ONLY):
1. origin from resolved mirrors 1. origin from resolved mirrors (if it is a git URL)
2. MIRRORS file order 2. first git URL from MIRRORS file (in file order)
3. config mirrors order 3. first git URL from config mirrors (in config order)
4. default SSH URL 4. default SSH URL
""" """
resolved = ctx.resolved_mirrors resolved = ctx.resolved_mirrors
origin = resolved.get("origin")
if resolved.get("origin"): if origin and _is_git_remote_url(origin):
return resolved["origin"] return origin
for mirrors in (ctx.file_mirrors, ctx.config_mirrors): for mirrors in (ctx.file_mirrors, ctx.config_mirrors):
for _, url in mirrors.items(): for _, url in mirrors.items():
if url: if url and _is_git_remote_url(url):
return url return url
return build_default_ssh_url(repo) return build_default_ssh_url(repo)
@@ -82,10 +110,13 @@ def _ensure_additional_push_urls(
preview: bool, preview: bool,
) -> None: ) -> None:
""" """
Ensure all mirror URLs (except primary) are configured as additional push URLs for origin. Ensure all *git* mirror URLs (except primary) are configured as additional
Preview is handled by the underlying git runner. push URLs for origin.
Non-git URLs (like PyPI) are ignored and will never land in git config.
""" """
desired: Set[str] = {u for u in mirrors.values() if u and u != primary} git_only = _git_mirrors_only(mirrors)
desired: Set[str] = {u for u in git_only.values() if u and u != primary}
if not desired: if not desired:
return return
@@ -110,8 +141,8 @@ def ensure_origin_remote(
return return
primary = determine_primary_remote_url(repo, ctx) primary = determine_primary_remote_url(repo, ctx)
if not primary: if not primary or not _is_git_remote_url(primary):
print("[WARN] No primary mirror URL could be determined.") print("[WARN] No valid git primary mirror URL could be determined.")
return return
# 1) Ensure origin exists # 1) Ensure origin exists
@@ -122,14 +153,13 @@ def ensure_origin_remote(
print(f"[WARN] Failed to add origin remote: {exc}") print(f"[WARN] Failed to add origin remote: {exc}")
return # without origin we cannot reliably proceed return # without origin we cannot reliably proceed
# 2) Ensure origin fetch+push URLs are correct (ALWAYS, even if origin already existed) # 2) Ensure origin fetch+push URLs are correct
try: try:
_set_origin_fetch_and_push(repo_dir, primary, preview) _set_origin_fetch_and_push(repo_dir, primary, preview)
except GitSetRemoteUrlError as exc: except GitSetRemoteUrlError as exc:
# Do not abort: still try to add additional push URLs
print(f"[WARN] Failed to set origin URLs: {exc}") print(f"[WARN] Failed to set origin URLs: {exc}")
# 3) Ensure additional push URLs for mirrors # 3) Ensure additional push URLs for mirrors (git urls only)
try: try:
_ensure_additional_push_urls(repo_dir, ctx.resolved_mirrors, primary, preview) _ensure_additional_push_urls(repo_dir, ctx.resolved_mirrors, primary, preview)
except GitAddRemotePushUrlError as exc: except GitAddRemotePushUrlError as exc:

View File

@@ -2,13 +2,29 @@ from __future__ import annotations
from typing import List from typing import List
from pkgmgr.core.git.queries import probe_remote_reachable
from .context import build_context from .context import build_context
from .git_remote import ensure_origin_remote, determine_primary_remote_url from .git_remote import ensure_origin_remote, determine_primary_remote_url
from pkgmgr.core.git.queries import probe_remote_reachable
from .remote_provision import ensure_remote_repository from .remote_provision import ensure_remote_repository
from .types import Repository from .types import Repository
def _is_git_remote_url(url: str) -> bool:
# Keep the same filtering semantics as in git_remote.py (duplicated on purpose
# to keep setup_cmd independent of private helpers).
u = (url or "").strip()
if not u:
return False
if u.startswith("git@"):
return True
if u.startswith("ssh://"):
return True
if (u.startswith("https://") or u.startswith("http://")) and u.endswith(".git"):
return True
return False
def _setup_local_mirrors_for_repo( def _setup_local_mirrors_for_repo(
repo: Repository, repo: Repository,
repositories_base_dir: str, repositories_base_dir: str,
@@ -48,16 +64,23 @@ def _setup_remote_mirrors_for_repo(
preview, preview,
) )
if not ctx.resolved_mirrors: # Probe only git URLs (do not try ls-remote against PyPI etc.)
# If there are no mirrors at all, probe the primary git URL.
git_mirrors = {k: v for k, v in ctx.resolved_mirrors.items() if _is_git_remote_url(v)}
if not git_mirrors:
primary = determine_primary_remote_url(repo, ctx) primary = determine_primary_remote_url(repo, ctx)
if not primary: if not primary or not _is_git_remote_url(primary):
print("[INFO] No git mirrors to probe.")
print()
return return
ok = probe_remote_reachable(primary, cwd=ctx.repo_dir) ok = probe_remote_reachable(primary, cwd=ctx.repo_dir)
print("[OK]" if ok else "[WARN]", primary) print("[OK]" if ok else "[WARN]", primary)
print() print()
return return
for name, url in ctx.resolved_mirrors.items(): for name, url in git_mirrors.items():
ok = probe_remote_reachable(url, cwd=ctx.repo_dir) ok = probe_remote_reachable(url, cwd=ctx.repo_dir)
print(f"[OK] {name}: {url}" if ok else f"[WARN] {name}: {url}") print(f"[OK] {name}: {url}" if ok else f"[WARN] {name}: {url}")

View File

@@ -24,6 +24,8 @@ import tempfile
from datetime import date, datetime from datetime import date, datetime
from typing import Optional, Tuple from typing import Optional, Tuple
from pkgmgr.core.git.queries import get_config_value
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Editor helper for interactive changelog messages # Editor helper for interactive changelog messages
@@ -74,10 +76,7 @@ def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
except OSError: except OSError:
pass pass
lines = [ lines = [line for line in content.splitlines() if not line.strip().startswith("#")]
line for line in content.splitlines()
if not line.strip().startswith("#")
]
return "\n".join(lines).strip() return "\n".join(lines).strip()
@@ -85,6 +84,7 @@ def _open_editor_for_changelog(initial_message: Optional[str] = None) -> str:
# File update helpers (pyproject + extra packaging + changelog) # File update helpers (pyproject + extra packaging + changelog)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def update_pyproject_version( def update_pyproject_version(
pyproject_path: str, pyproject_path: str,
new_version: str, new_version: str,
@@ -365,24 +365,6 @@ def update_changelog(
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _get_git_config_value(key: str) -> Optional[str]:
"""
Try to read a value from `git config --get <key>`.
"""
try:
result = subprocess.run(
["git", "config", "--get", key],
capture_output=True,
text=True,
check=False,
)
except Exception:
return None
value = result.stdout.strip()
return value or None
def _get_debian_author() -> Tuple[str, str]: def _get_debian_author() -> Tuple[str, str]:
""" """
Determine the maintainer name/email for debian/changelog entries. Determine the maintainer name/email for debian/changelog entries.
@@ -396,9 +378,9 @@ def _get_debian_author() -> Tuple[str, str]:
email = os.environ.get("GIT_AUTHOR_EMAIL") email = os.environ.get("GIT_AUTHOR_EMAIL")
if not name: if not name:
name = _get_git_config_value("user.name") name = get_config_value("user.name")
if not email: if not email:
email = _get_git_config_value("user.email") email = get_config_value("user.email")
if not name: if not name:
name = "Unknown Maintainer" name = "Unknown Maintainer"

View File

@@ -1,73 +1,90 @@
from __future__ import annotations from __future__ import annotations
import subprocess from pkgmgr.core.git.commands import (
fetch,
from pkgmgr.core.git import GitError pull_ff_only,
push,
tag_force_annotated,
def run_git_command(cmd: str) -> None:
print(f"[GIT] {cmd}")
try:
subprocess.run(
cmd,
shell=True,
check=True,
text=True,
capture_output=True,
) )
except subprocess.CalledProcessError as exc: from pkgmgr.core.git.queries import get_upstream_ref, list_tags
print(f"[ERROR] Git command failed: {cmd}")
print(f" Exit code: {exc.returncode}")
if exc.stdout:
print("\n" + exc.stdout)
if exc.stderr:
print("\n" + exc.stderr)
raise GitError(f"Git command failed: {cmd}") from exc
def _capture(cmd: str) -> str: def ensure_clean_and_synced(*, preview: bool = False) -> None:
res = subprocess.run(cmd, shell=True, check=False, capture_output=True, text=True)
return (res.stdout or "").strip()
def ensure_clean_and_synced(preview: bool = False) -> None:
""" """
Always run a pull BEFORE modifying anything. Always run a pull BEFORE modifying anything.
Uses --ff-only to avoid creating merge commits automatically. Uses --ff-only to avoid creating merge commits automatically.
If no upstream is configured, we skip. If no upstream is configured, we skip.
""" """
upstream = _capture("git rev-parse --abbrev-ref --symbolic-full-name @{u} 2>/dev/null") upstream = get_upstream_ref()
if not upstream: if not upstream:
print("[INFO] No upstream configured for current branch. Skipping pull.") print("[INFO] No upstream configured for current branch. Skipping pull.")
return return
if preview:
print("[PREVIEW] Would run: git fetch origin --prune --tags --force")
print("[PREVIEW] Would run: git pull --ff-only")
return
print("[INFO] Syncing with remote before making any changes...") print("[INFO] Syncing with remote before making any changes...")
run_git_command("git fetch origin --prune --tags --force")
run_git_command("git pull --ff-only") # Mirrors old behavior:
# git fetch origin --prune --tags --force
# git pull --ff-only
fetch(remote="origin", prune=True, tags=True, force=True, preview=preview)
pull_ff_only(preview=preview)
def _parse_v_tag(tag: str) -> tuple[int, ...] | None:
"""
Parse tags like 'v1.2.3' into (1, 2, 3).
Returns None if parsing is not possible.
"""
if not tag.startswith("v"):
return None
raw = tag[1:]
if not raw:
return None
parts = raw.split(".")
out: list[int] = []
for p in parts:
if not p.isdigit():
return None
out.append(int(p))
return tuple(out) if out else None
def is_highest_version_tag(tag: str) -> bool: def is_highest_version_tag(tag: str) -> bool:
""" """
Return True if `tag` is the highest version among all tags matching v*. Return True if `tag` is the highest version among all tags matching v*.
Comparison uses `sort -V` for natural version ordering.
We avoid shelling out to `sort -V` and implement a small vX.Y.Z parser.
Non-parseable v* tags are ignored for version comparison.
""" """
all_v = _capture("git tag --list 'v*'") all_v = list_tags("v*")
if not all_v: if not all_v:
return True # No tags yet, so the current tag is the highest return True # No tags yet -> current is highest by definition
# Get the latest tag in natural version order parsed_current = _parse_v_tag(tag)
latest = _capture("git tag --list 'v*' | sort -V | tail -n1") if parsed_current is None:
print(f"[INFO] Latest tag: {latest}, Current tag: {tag}") # If the "current" tag isn't parseable, fall back to conservative behavior:
# treat it as highest only if it matches the max lexicographically.
latest_lex = max(all_v)
print(f"[INFO] Latest tag (lex): {latest_lex}, Current tag: {tag}")
return tag >= latest_lex
# Ensure that the current tag is always considered the highest if it's the latest one parsed_all: list[tuple[int, ...]] = []
return tag >= latest # Use comparison operator to consider all future tags for t in all_v:
parsed = _parse_v_tag(t)
if parsed is not None:
parsed_all.append(parsed)
if not parsed_all:
# No parseable tags -> nothing to compare against
return True
latest = max(parsed_all)
print(f"[INFO] Latest tag (parsed): v{'.'.join(map(str, latest))}, Current tag: {tag}")
return parsed_current >= latest
def update_latest_tag(new_tag: str, preview: bool = False) -> None: def update_latest_tag(new_tag: str, *, preview: bool = False) -> None:
""" """
Move the floating 'latest' tag to the newly created release tag. Move the floating 'latest' tag to the newly created release tag.
@@ -78,15 +95,10 @@ def update_latest_tag(new_tag: str, preview: bool = False) -> None:
target_ref = f"{new_tag}^{{}}" target_ref = f"{new_tag}^{{}}"
print(f"[INFO] Updating 'latest' tag to point at {new_tag} (commit {target_ref})...") print(f"[INFO] Updating 'latest' tag to point at {new_tag} (commit {target_ref})...")
if preview: tag_force_annotated(
print( name="latest",
f'[PREVIEW] Would run: git tag -f -a latest {target_ref} ' target=target_ref,
f'-m "Floating latest tag for {new_tag}"' message=f"Floating latest tag for {new_tag}",
preview=preview,
) )
print("[PREVIEW] Would run: git push origin latest --force") push("origin", "latest", force=True, preview=preview)
return
run_git_command(
f'git tag -f -a latest {target_ref} -m "Floating latest tag for {new_tag}"'
)
run_git_command("git push origin latest --force")

View File

@@ -6,6 +6,7 @@ from typing import Optional
from pkgmgr.actions.branch import close_branch from pkgmgr.actions.branch import close_branch
from pkgmgr.core.git import GitError from pkgmgr.core.git import GitError
from pkgmgr.core.git.commands import add, commit, push, tag_annotated
from pkgmgr.core.git.queries import get_current_branch from pkgmgr.core.git.queries import get_current_branch
from pkgmgr.core.repository.paths import resolve_repo_paths from pkgmgr.core.repository.paths import resolve_repo_paths
@@ -21,7 +22,6 @@ from .files import (
from .git_ops import ( from .git_ops import (
ensure_clean_and_synced, ensure_clean_and_synced,
is_highest_version_tag, is_highest_version_tag,
run_git_command,
update_latest_tag, update_latest_tag,
) )
from .prompts import confirm_proceed_release, should_delete_branch from .prompts import confirm_proceed_release, should_delete_branch
@@ -126,12 +126,11 @@ def _release_impl(
existing_files = [p for p in files_to_add if isinstance(p, str) and p and os.path.exists(p)] existing_files = [p for p in files_to_add if isinstance(p, str) and p and os.path.exists(p)]
if preview: if preview:
for path in existing_files: add(existing_files, preview=True)
print(f"[PREVIEW] Would run: git add {path}") commit(commit_msg, all=True, preview=True)
print(f'[PREVIEW] Would run: git commit -am "{commit_msg}"') tag_annotated(new_tag, tag_msg, preview=True)
print(f'[PREVIEW] Would run: git tag -a {new_tag} -m "{tag_msg}"') push("origin", branch, preview=True)
print(f"[PREVIEW] Would run: git push origin {branch}") push("origin", new_tag, preview=True)
print(f"[PREVIEW] Would run: git push origin {new_tag}")
if is_highest_version_tag(new_tag): if is_highest_version_tag(new_tag):
update_latest_tag(new_tag, preview=True) update_latest_tag(new_tag, preview=True)
@@ -145,15 +144,13 @@ def _release_impl(
print(f"[PREVIEW] Would ask whether to delete branch {branch} after release.") print(f"[PREVIEW] Would ask whether to delete branch {branch} after release.")
return return
for path in existing_files: add(existing_files, preview=False)
run_git_command(f"git add {path}") commit(commit_msg, all=True, preview=False)
tag_annotated(new_tag, tag_msg, preview=False)
run_git_command(f'git commit -am "{commit_msg}"')
run_git_command(f'git tag -a {new_tag} -m "{tag_msg}"')
# Push branch and ONLY the newly created version tag (no --tags) # Push branch and ONLY the newly created version tag (no --tags)
run_git_command(f"git push origin {branch}") push("origin", branch, preview=False)
run_git_command(f"git push origin {new_tag}") push("origin", new_tag, preview=False)
# Update 'latest' only if this is the highest version tag # Update 'latest' only if this is the highest version tag
try: try:

View File

@@ -1,103 +1,132 @@
import subprocess from __future__ import annotations
import os import os
from typing import Any, Dict, List, Optional
from pkgmgr.core.git.commands import clone as git_clone, GitCloneError
from pkgmgr.core.repository.dir import get_repo_dir from pkgmgr.core.repository.dir import get_repo_dir
from pkgmgr.core.repository.identifier import get_repo_identifier from pkgmgr.core.repository.identifier import get_repo_identifier
from pkgmgr.core.repository.verify import verify_repository from pkgmgr.core.repository.verify import verify_repository
Repository = Dict[str, Any]
def _build_clone_url(repo: Repository, clone_mode: str) -> Optional[str]:
provider = repo.get("provider")
account = repo.get("account")
name = repo.get("repository")
replacement = repo.get("replacement")
if clone_mode == "ssh":
if not provider or not account or not name:
return None
return f"git@{provider}:{account}/{name}.git"
if clone_mode in ("https", "shallow"):
if replacement:
return f"https://{replacement}.git"
if not provider or not account or not name:
return None
return f"https://{provider}/{account}/{name}.git"
return None
def clone_repos( def clone_repos(
selected_repos, selected_repos: List[Repository],
repositories_base_dir: str, repositories_base_dir: str,
all_repos, all_repos: List[Repository],
preview: bool, preview: bool,
no_verification: bool, no_verification: bool,
clone_mode: str clone_mode: str,
): ) -> None:
for repo in selected_repos: for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos) repo_identifier = get_repo_identifier(repo, all_repos)
repo_dir = get_repo_dir(repositories_base_dir, repo) repo_dir = get_repo_dir(repositories_base_dir, repo)
if os.path.exists(repo_dir): if os.path.exists(repo_dir):
print(f"[INFO] Repository '{repo_identifier}' already exists at '{repo_dir}'. Skipping clone.") print(
f"[INFO] Repository '{repo_identifier}' already exists at '{repo_dir}'. Skipping clone."
)
continue continue
parent_dir = os.path.dirname(repo_dir) parent_dir = os.path.dirname(repo_dir)
os.makedirs(parent_dir, exist_ok=True) os.makedirs(parent_dir, exist_ok=True)
# Build clone URL based on the clone_mode
# Build clone URL based on the clone_mode clone_url = _build_clone_url(repo, clone_mode)
if clone_mode == "ssh": if not clone_url:
clone_url = ( print(f"[WARNING] Cannot build clone URL for '{repo_identifier}'. Skipping.")
f"git@{repo.get('provider')}:"
f"{repo.get('account')}/"
f"{repo.get('repository')}.git"
)
elif clone_mode in ("https", "shallow"):
# Use replacement if defined, otherwise construct from provider/account/repository
if repo.get("replacement"):
clone_url = f"https://{repo.get('replacement')}.git"
else:
clone_url = (
f"https://{repo.get('provider')}/"
f"{repo.get('account')}/"
f"{repo.get('repository')}.git"
)
else:
print(f"Unknown clone mode '{clone_mode}'. Aborting clone for {repo_identifier}.")
continue continue
# Build base clone command shallow = clone_mode == "shallow"
base_clone_cmd = "git clone" mode_label = "HTTPS (shallow)" if shallow else clone_mode.upper()
if clone_mode == "shallow":
# Shallow clone: only latest state via HTTPS, no full history
base_clone_cmd += " --depth 1 --single-branch"
mode_label = "HTTPS (shallow)" if clone_mode == "shallow" else clone_mode.upper()
print( print(
f"[INFO] Attempting to clone '{repo_identifier}' using {mode_label} " f"[INFO] Attempting to clone '{repo_identifier}' using {mode_label} "
f"from {clone_url} into '{repo_dir}'." f"from {clone_url} into '{repo_dir}'."
) )
if preview: try:
print(f"[Preview] Would run: {base_clone_cmd} {clone_url} {repo_dir} in {parent_dir}") args = []
result = subprocess.CompletedProcess(args=[], returncode=0) if shallow:
else: args += ["--depth", "1", "--single-branch"]
result = subprocess.run( args += [clone_url, repo_dir]
f"{base_clone_cmd} {clone_url} {repo_dir}",
git_clone(
args,
cwd=parent_dir, cwd=parent_dir,
shell=True, preview=preview,
) )
if result.returncode != 0: except GitCloneError as exc:
# Only offer fallback if the original mode was SSH. if clone_mode != "ssh":
if clone_mode == "ssh": print(f"[WARNING] Clone failed for '{repo_identifier}': {exc}")
print(f"[WARNING] SSH clone failed for '{repo_identifier}' with return code {result.returncode}.") continue
choice = input("Do you want to attempt HTTPS clone instead? (y/N): ").strip().lower()
if choice == 'y': print(f"[WARNING] SSH clone failed for '{repo_identifier}': {exc}")
# Attempt HTTPS clone choice = input("Do you want to attempt HTTPS clone instead? (y/N): ").strip().lower()
if repo.get("replacement"): if choice != "y":
clone_url = f"https://{repo.get('replacement')}.git" print(f"[INFO] HTTPS clone not attempted for '{repo_identifier}'.")
else: continue
clone_url = f"https://{repo.get('provider')}/{repo.get('account')}/{repo.get('repository')}.git"
print(f"[INFO] Attempting to clone '{repo_identifier}' using HTTPS from {clone_url} into '{repo_dir}'.") fallback_url = _build_clone_url(repo, "https")
if preview: if not fallback_url:
print(f"[Preview] Would run: git clone {clone_url} {repo_dir} in {parent_dir}") print(f"[WARNING] Cannot build HTTPS URL for '{repo_identifier}'.")
result = subprocess.CompletedProcess(args=[], returncode=0) continue
else:
result = subprocess.run(f"git clone {clone_url} {repo_dir}", cwd=parent_dir, shell=True) print(
else: f"[INFO] Attempting to clone '{repo_identifier}' using HTTPS "
print(f"[INFO] HTTPS clone not attempted for '{repo_identifier}'.") f"from {fallback_url} into '{repo_dir}'."
continue )
else:
# For https mode, do not attempt fallback. try:
print(f"[WARNING] HTTPS clone failed for '{repo_identifier}' with return code {result.returncode}.") git_clone(
[fallback_url, repo_dir],
cwd=parent_dir,
preview=preview,
)
except GitCloneError as exc2:
print(f"[WARNING] HTTPS clone failed for '{repo_identifier}': {exc2}")
continue continue
# After cloning, perform verification in local mode.
verified_info = repo.get("verified") verified_info = repo.get("verified")
if verified_info: if not verified_info:
verified_ok, errors, commit_hash, signing_key = verify_repository(repo, repo_dir, mode="local", no_verification=no_verification) continue
if not no_verification and not verified_ok:
verified_ok, errors, _commit_hash, _signing_key = verify_repository(
repo,
repo_dir,
mode="local",
no_verification=no_verification,
)
if no_verification or verified_ok:
continue
print(f"Warning: Verification failed for {repo_identifier} after cloning:") print(f"Warning: Verification failed for {repo_identifier} after cloning:")
for err in errors: for err in errors:
print(f" - {err}") print(f" - {err}")
choice = input("Proceed anyway? (y/N): ").strip().lower() choice = input("Proceed anyway? (y/N): ").strip().lower()
if choice != "y": if choice != "y":
print(f"Skipping repository {repo_identifier} due to failed verification.") print(f"Skipping repository {repo_identifier} due to failed verification.")

View File

@@ -1,257 +0,0 @@
from __future__ import annotations
import os
import re
import subprocess
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
from urllib.parse import urlparse
import yaml
from pkgmgr.actions.mirror.io import write_mirrors_file
from pkgmgr.actions.mirror.setup_cmd import setup_mirrors
from pkgmgr.actions.repository.scaffold import render_default_templates
from pkgmgr.core.command.alias import generate_alias
from pkgmgr.core.config.save import save_user_config
Repository = Dict[str, Any]
_NAME_RE = re.compile(r"^[a-z0-9_-]+$")
@dataclass(frozen=True)
class RepoParts:
host: str
port: Optional[str]
owner: str
name: str
def _run(cmd: str, cwd: str, preview: bool) -> None:
if preview:
print(f"[Preview] Would run in {cwd}: {cmd}")
return
subprocess.run(cmd, cwd=cwd, shell=True, check=True)
def _git_get(key: str) -> str:
try:
out = subprocess.run(
f"git config --get {key}",
shell=True,
check=False,
capture_output=True,
text=True,
)
return (out.stdout or "").strip()
except Exception:
return ""
def _split_host_port(host_with_port: str) -> Tuple[str, Optional[str]]:
if ":" in host_with_port:
host, port = host_with_port.split(":", 1)
return host, port or None
return host_with_port, None
def _strip_git_suffix(name: str) -> str:
return name[:-4] if name.endswith(".git") else name
def _parse_git_url(url: str) -> RepoParts:
if url.startswith("git@") and "://" not in url:
left, right = url.split(":", 1)
host = left.split("@", 1)[1]
path = right.lstrip("/")
owner, name = path.split("/", 1)
return RepoParts(host=host, port=None, owner=owner, name=_strip_git_suffix(name))
parsed = urlparse(url)
host = (parsed.hostname or "").strip()
port = str(parsed.port) if parsed.port else None
path = (parsed.path or "").strip("/")
if not host or not path or "/" not in path:
raise ValueError(f"Could not parse git URL: {url}")
owner, name = path.split("/", 1)
return RepoParts(host=host, port=port, owner=owner, name=_strip_git_suffix(name))
def _parse_identifier(identifier: str) -> RepoParts:
ident = identifier.strip()
if "://" in ident or ident.startswith("git@"):
return _parse_git_url(ident)
parts = ident.split("/")
if len(parts) != 3:
raise ValueError("Identifier must be URL or 'provider(:port)/owner/repo'.")
host_with_port, owner, name = parts
host, port = _split_host_port(host_with_port)
return RepoParts(host=host, port=port, owner=owner, name=name)
def _ensure_valid_repo_name(name: str) -> None:
if not name or not _NAME_RE.fullmatch(name):
raise ValueError("Repository name must match: lowercase a-z, 0-9, '_' and '-'.")
def _repo_homepage(host: str, owner: str, name: str) -> str:
return f"https://{host}/{owner}/{name}"
def _build_default_primary_url(parts: RepoParts) -> str:
if parts.port:
return f"ssh://git@{parts.host}:{parts.port}/{parts.owner}/{parts.name}.git"
return f"git@{parts.host}:{parts.owner}/{parts.name}.git"
def _write_default_mirrors(repo_dir: str, primary: str, name: str, preview: bool) -> None:
mirrors = {"origin": primary, "pypi": f"https://pypi.org/project/{name}/"}
write_mirrors_file(repo_dir, mirrors, preview=preview)
def _git_init_and_initial_commit(repo_dir: str, preview: bool) -> None:
_run("git init", cwd=repo_dir, preview=preview)
_run("git add -A", cwd=repo_dir, preview=preview)
if preview:
print(f'[Preview] Would run in {repo_dir}: git commit -m "Initial commit"')
return
subprocess.run('git commit -m "Initial commit"', cwd=repo_dir, shell=True, check=False)
def _git_push_main_or_master(repo_dir: str, preview: bool) -> None:
_run("git branch -M main", cwd=repo_dir, preview=preview)
try:
_run("git push -u origin main", cwd=repo_dir, preview=preview)
return
except subprocess.CalledProcessError:
pass
try:
_run("git branch -M master", cwd=repo_dir, preview=preview)
_run("git push -u origin master", cwd=repo_dir, preview=preview)
except subprocess.CalledProcessError as exc:
print(f"[WARN] Push failed: {exc}")
def create_repo(
identifier: str,
config_merged: Dict[str, Any],
user_config_path: str,
bin_dir: str,
*,
remote: bool = False,
preview: bool = False,
) -> None:
parts = _parse_identifier(identifier)
_ensure_valid_repo_name(parts.name)
directories = config_merged.get("directories") or {}
base_dir = os.path.expanduser(str(directories.get("repositories", "~/Repositories")))
repo_dir = os.path.join(base_dir, parts.host, parts.owner, parts.name)
author_name = _git_get("user.name") or "Unknown Author"
author_email = _git_get("user.email") or "unknown@example.invalid"
homepage = _repo_homepage(parts.host, parts.owner, parts.name)
primary_url = _build_default_primary_url(parts)
repositories = config_merged.get("repositories") or []
exists = any(
(
r.get("provider") == parts.host
and r.get("account") == parts.owner
and r.get("repository") == parts.name
)
for r in repositories
)
if not exists:
new_entry: Repository = {
"provider": parts.host,
"port": parts.port,
"account": parts.owner,
"repository": parts.name,
"homepage": homepage,
"alias": generate_alias(
{"repository": parts.name, "provider": parts.host, "account": parts.owner},
bin_dir,
existing_aliases=set(),
),
"verified": {},
}
if os.path.exists(user_config_path):
with open(user_config_path, "r", encoding="utf-8") as f:
user_config = yaml.safe_load(f) or {}
else:
user_config = {"repositories": []}
user_config.setdefault("repositories", [])
user_config["repositories"].append(new_entry)
if preview:
print(f"[Preview] Would save user config: {user_config_path}")
else:
save_user_config(user_config, user_config_path)
config_merged.setdefault("repositories", []).append(new_entry)
repo = new_entry
print(f"[INFO] Added repository to configuration: {parts.host}/{parts.owner}/{parts.name}")
else:
repo = next(
r
for r in repositories
if (
r.get("provider") == parts.host
and r.get("account") == parts.owner
and r.get("repository") == parts.name
)
)
print(f"[INFO] Repository already in configuration: {parts.host}/{parts.owner}/{parts.name}")
if preview:
print(f"[Preview] Would ensure directory exists: {repo_dir}")
else:
os.makedirs(repo_dir, exist_ok=True)
tpl_context = {
"provider": parts.host,
"port": parts.port,
"account": parts.owner,
"repository": parts.name,
"homepage": homepage,
"author_name": author_name,
"author_email": author_email,
"license_text": f"All rights reserved by {author_name}",
"primary_remote": primary_url,
}
render_default_templates(repo_dir, context=tpl_context, preview=preview)
_git_init_and_initial_commit(repo_dir, preview=preview)
_write_default_mirrors(repo_dir, primary=primary_url, name=parts.name, preview=preview)
repo.setdefault("mirrors", {})
repo["mirrors"].setdefault("origin", primary_url)
repo["mirrors"].setdefault("pypi", f"https://pypi.org/project/{parts.name}/")
setup_mirrors(
selected_repos=[repo],
repositories_base_dir=base_dir,
all_repos=config_merged.get("repositories", []),
preview=preview,
local=True,
remote=True,
ensure_remote=bool(remote),
)
if remote:
_git_push_main_or_master(repo_dir, preview=preview)

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from typing import Any, Dict
from .service import CreateRepoService
RepositoryConfig = Dict[str, Any]
__all__ = [
"CreateRepoService",
"create_repo",
]
def create_repo(
identifier: str,
config_merged: RepositoryConfig,
user_config_path: str,
bin_dir: str,
*,
remote: bool = False,
preview: bool = False,
) -> None:
CreateRepoService(
config_merged=config_merged,
user_config_path=user_config_path,
bin_dir=bin_dir,
).run(identifier=identifier, preview=preview, remote=remote)

View File

@@ -0,0 +1,84 @@
from __future__ import annotations
import os
from typing import Dict, Any, Set
import yaml
from pkgmgr.core.command.alias import generate_alias
from pkgmgr.core.config.save import save_user_config
Repository = Dict[str, Any]
class ConfigRepoWriter:
def __init__(
self,
*,
config_merged: Dict[str, Any],
user_config_path: str,
bin_dir: str,
):
self.config_merged = config_merged
self.user_config_path = user_config_path
self.bin_dir = bin_dir
def ensure_repo_entry(
self,
*,
host: str,
port: str | None,
owner: str,
name: str,
homepage: str,
preview: bool,
) -> Repository:
repositories = self.config_merged.setdefault("repositories", [])
for repo in repositories:
if (
repo.get("provider") == host
and repo.get("account") == owner
and repo.get("repository") == name
):
return repo
existing_aliases: Set[str] = {
str(r.get("alias")) for r in repositories if r.get("alias")
}
repo: Repository = {
"provider": host,
"port": port,
"account": owner,
"repository": name,
"homepage": homepage,
"alias": generate_alias(
{
"repository": name,
"provider": host,
"account": owner,
},
self.bin_dir,
existing_aliases=existing_aliases,
),
"verified": {},
}
if preview:
print(f"[Preview] Would add repository to config: {repo}")
return repo
if os.path.exists(self.user_config_path):
with open(self.user_config_path, "r", encoding="utf-8") as f:
user_cfg = yaml.safe_load(f) or {}
else:
user_cfg = {}
user_cfg.setdefault("repositories", []).append(repo)
save_user_config(user_cfg, self.user_config_path)
repositories.append(repo)
print(f"[INFO] Added repository to configuration: {host}/{owner}/{name}")
return repo

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from pkgmgr.core.git.commands import (
GitCommitError,
GitPushUpstreamError,
add_all,
branch_move,
commit,
init,
push_upstream,
)
class GitBootstrapper:
def init_repo(self, repo_dir: str, preview: bool) -> None:
init(cwd=repo_dir, preview=preview)
add_all(cwd=repo_dir, preview=preview)
try:
commit("Initial commit", cwd=repo_dir, preview=preview)
except GitCommitError as exc:
print(f"[WARN] Initial commit failed (continuing): {exc}")
def push_default_branch(self, repo_dir: str, preview: bool) -> None:
try:
branch_move("main", cwd=repo_dir, preview=preview)
push_upstream("origin", "main", cwd=repo_dir, preview=preview)
return
except GitPushUpstreamError:
pass
try:
branch_move("master", cwd=repo_dir, preview=preview)
push_upstream("origin", "master", cwd=repo_dir, preview=preview)
except GitPushUpstreamError as exc:
print(f"[WARN] Push failed: {exc}")

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from typing import Any, Dict
from pkgmgr.actions.mirror.io import write_mirrors_file
from pkgmgr.actions.mirror.setup_cmd import setup_mirrors
Repository = Dict[str, Any]
class MirrorBootstrapper:
"""
MIRRORS is the single source of truth.
We write defaults to MIRRORS and then call mirror setup which will
configure git remotes based on MIRRORS content (but only for git URLs).
"""
def write_defaults(
self,
*,
repo_dir: str,
primary: str,
name: str,
preview: bool,
) -> None:
mirrors = {
# preferred SSH url is supplied by CreateRepoPlanner.primary_remote
"origin": primary,
# metadata only: must NEVER be configured as a git remote
"pypi": f"https://pypi.org/project/{name}/",
}
write_mirrors_file(repo_dir, mirrors, preview=preview)
def setup(
self,
*,
repo: Repository,
repositories_base_dir: str,
all_repos: list[Repository],
preview: bool,
remote: bool,
) -> None:
# IMPORTANT: do NOT set repo["mirrors"] here.
# MIRRORS file is the single source of truth.
setup_mirrors(
selected_repos=[repo],
repositories_base_dir=repositories_base_dir,
all_repos=all_repos,
preview=preview,
local=True,
remote=True,
ensure_remote=remote,
)

View File

@@ -0,0 +1,12 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class RepoParts:
host: str
port: Optional[str]
owner: str
name: str

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
import re
from typing import Tuple
from urllib.parse import urlparse
from .model import RepoParts
_NAME_RE = re.compile(r"^[a-z0-9_-]+$")
def parse_identifier(identifier: str) -> RepoParts:
ident = identifier.strip()
if "://" in ident or ident.startswith("git@"):
return _parse_git_url(ident)
parts = ident.split("/")
if len(parts) != 3:
raise ValueError("Identifier must be URL or 'provider(:port)/owner/repo'.")
host_with_port, owner, name = parts
host, port = _split_host_port(host_with_port)
_ensure_valid_repo_name(name)
return RepoParts(host=host, port=port, owner=owner, name=name)
def _parse_git_url(url: str) -> RepoParts:
if url.startswith("git@") and "://" not in url:
left, right = url.split(":", 1)
host = left.split("@", 1)[1]
owner, name = right.lstrip("/").split("/", 1)
name = _strip_git_suffix(name)
_ensure_valid_repo_name(name)
return RepoParts(host=host, port=None, owner=owner, name=name)
parsed = urlparse(url)
host = parsed.hostname or ""
port = str(parsed.port) if parsed.port else None
path = (parsed.path or "").strip("/")
if not host or "/" not in path:
raise ValueError(f"Could not parse git URL: {url}")
owner, name = path.split("/", 1)
name = _strip_git_suffix(name)
_ensure_valid_repo_name(name)
return RepoParts(host=host, port=port, owner=owner, name=name)
def _split_host_port(host: str) -> Tuple[str, str | None]:
if ":" in host:
h, p = host.split(":", 1)
return h, p or None
return host, None
def _strip_git_suffix(name: str) -> str:
return name[:-4] if name.endswith(".git") else name
def _ensure_valid_repo_name(name: str) -> None:
if not _NAME_RE.fullmatch(name):
raise ValueError(
"Repository name must match: lowercase a-z, 0-9, '_' and '-'."
)

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import os
from typing import Dict, Any
from .model import RepoParts
class CreateRepoPlanner:
def __init__(self, parts: RepoParts, repositories_base_dir: str):
self.parts = parts
self.repositories_base_dir = os.path.expanduser(repositories_base_dir)
@property
def repo_dir(self) -> str:
return os.path.join(
self.repositories_base_dir,
self.parts.host,
self.parts.owner,
self.parts.name,
)
@property
def homepage(self) -> str:
return f"https://{self.parts.host}/{self.parts.owner}/{self.parts.name}"
@property
def primary_remote(self) -> str:
if self.parts.port:
return (
f"ssh://git@{self.parts.host}:{self.parts.port}/"
f"{self.parts.owner}/{self.parts.name}.git"
)
return f"git@{self.parts.host}:{self.parts.owner}/{self.parts.name}.git"
def template_context(
self,
*,
author_name: str,
author_email: str,
) -> Dict[str, Any]:
return {
"provider": self.parts.host,
"port": self.parts.port,
"account": self.parts.owner,
"repository": self.parts.name,
"homepage": self.homepage,
"author_name": author_name,
"author_email": author_email,
"license_text": f"All rights reserved by {author_name}",
"primary_remote": self.primary_remote,
}

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
import os
from typing import Dict, Any
from pkgmgr.core.git.queries import get_config_value
from .parser import parse_identifier
from .planner import CreateRepoPlanner
from .config_writer import ConfigRepoWriter
from .templates import TemplateRenderer
from .git_bootstrap import GitBootstrapper
from .mirrors import MirrorBootstrapper
class CreateRepoService:
def __init__(
self,
*,
config_merged: Dict[str, Any],
user_config_path: str,
bin_dir: str,
):
self.config_merged = config_merged
self.user_config_path = user_config_path
self.bin_dir = bin_dir
self.templates = TemplateRenderer()
self.git = GitBootstrapper()
self.mirrors = MirrorBootstrapper()
def run(
self,
*,
identifier: str,
preview: bool,
remote: bool,
) -> None:
parts = parse_identifier(identifier)
base_dir = self.config_merged.get("directories", {}).get(
"repositories", "~/Repositories"
)
planner = CreateRepoPlanner(parts, base_dir)
writer = ConfigRepoWriter(
config_merged=self.config_merged,
user_config_path=self.user_config_path,
bin_dir=self.bin_dir,
)
repo = writer.ensure_repo_entry(
host=parts.host,
port=parts.port,
owner=parts.owner,
name=parts.name,
homepage=planner.homepage,
preview=preview,
)
if preview:
print(f"[Preview] Would ensure directory exists: {planner.repo_dir}")
else:
os.makedirs(planner.repo_dir, exist_ok=True)
author_name = get_config_value("user.name") or "Unknown Author"
author_email = get_config_value("user.email") or "unknown@example.invalid"
self.templates.render(
repo_dir=planner.repo_dir,
context=planner.template_context(
author_name=author_name,
author_email=author_email,
),
preview=preview,
)
self.git.init_repo(planner.repo_dir, preview=preview)
self.mirrors.write_defaults(
repo_dir=planner.repo_dir,
primary=planner.primary_remote,
name=parts.name,
preview=preview,
)
self.mirrors.setup(
repo=repo,
repositories_base_dir=os.path.expanduser(base_dir),
all_repos=self.config_merged.get("repositories", []),
preview=preview,
remote=remote,
)
if remote:
self.git.push_default_branch(planner.repo_dir, preview=preview)

View File

@@ -0,0 +1,80 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import Dict, Any
from pkgmgr.core.git.queries import get_repo_root
try:
from jinja2 import Environment, FileSystemLoader, StrictUndefined
except Exception as exc: # pragma: no cover
Environment = None # type: ignore
FileSystemLoader = None # type: ignore
StrictUndefined = None # type: ignore
_JINJA_IMPORT_ERROR = exc
else:
_JINJA_IMPORT_ERROR = None
class TemplateRenderer:
def __init__(self) -> None:
self.templates_dir = self._resolve_templates_dir()
def render(
self,
*,
repo_dir: str,
context: Dict[str, Any],
preview: bool,
) -> None:
if preview:
self._preview()
return
if Environment is None:
raise RuntimeError(
"Jinja2 is required but not available. "
f"Import error: {_JINJA_IMPORT_ERROR}"
)
env = Environment(
loader=FileSystemLoader(self.templates_dir),
undefined=StrictUndefined,
autoescape=False,
keep_trailing_newline=True,
)
for root, _, files in os.walk(self.templates_dir):
for fn in files:
if not fn.endswith(".j2"):
continue
abs_src = os.path.join(root, fn)
rel_src = os.path.relpath(abs_src, self.templates_dir)
rel_out = rel_src[:-3]
abs_out = os.path.join(repo_dir, rel_out)
os.makedirs(os.path.dirname(abs_out), exist_ok=True)
template = env.get_template(rel_src)
rendered = template.render(**context)
with open(abs_out, "w", encoding="utf-8") as f:
f.write(rendered)
def _preview(self) -> None:
for root, _, files in os.walk(self.templates_dir):
for fn in files:
if fn.endswith(".j2"):
rel = os.path.relpath(
os.path.join(root, fn), self.templates_dir
)
print(f"[Preview] Would render template: {rel} -> {rel[:-3]}")
@staticmethod
def _resolve_templates_dir() -> str:
here = Path(__file__).resolve().parent
root = get_repo_root(cwd=str(here))
if not root:
raise RuntimeError("Could not determine repository root for templates.")
return os.path.join(root, "templates", "default")

View File

@@ -1,25 +1,30 @@
#!/usr/bin/env python3 from __future__ import annotations
# -*- coding: utf-8 -*-
import os import os
import subprocess
import sys import sys
from typing import List, Dict, Any
from pkgmgr.core.git.commands import pull_args, GitPullArgsError
from pkgmgr.core.repository.dir import get_repo_dir from pkgmgr.core.repository.dir import get_repo_dir
from pkgmgr.core.repository.identifier import get_repo_identifier from pkgmgr.core.repository.identifier import get_repo_identifier
from pkgmgr.core.repository.verify import verify_repository from pkgmgr.core.repository.verify import verify_repository
Repository = Dict[str, Any]
def pull_with_verification( def pull_with_verification(
selected_repos, selected_repos: List[Repository],
repositories_base_dir, repositories_base_dir: str,
all_repos, all_repos: List[Repository],
extra_args, extra_args: List[str],
no_verification, no_verification: bool,
preview: bool, preview: bool,
) -> None: ) -> None:
""" """
Execute `git pull` for each repository with verification. Execute `git pull` for each repository with verification.
- If verification fails and verification is enabled, prompt user to continue.
- Uses core.git.commands.pull_args() (no raw subprocess usage).
""" """
for repo in selected_repos: for repo in selected_repos:
repo_identifier = get_repo_identifier(repo, all_repos) repo_identifier = get_repo_identifier(repo, all_repos)
@@ -37,12 +42,7 @@ def pull_with_verification(
no_verification=no_verification, no_verification=no_verification,
) )
if ( if not preview and not no_verification and verified_info and not verified_ok:
not preview
and not no_verification
and verified_info
and not verified_ok
):
print(f"Warning: Verification failed for {repo_identifier}:") print(f"Warning: Verification failed for {repo_identifier}:")
for err in errors: for err in errors:
print(f" - {err}") print(f" - {err}")
@@ -50,17 +50,10 @@ def pull_with_verification(
if choice != "y": if choice != "y":
continue continue
args_part = " ".join(extra_args) if extra_args else "" try:
full_cmd = f"git pull{(' ' + args_part) if args_part else ''}" pull_args(extra_args, cwd=repo_dir, preview=preview)
except GitPullArgsError as exc:
if preview: # Keep behavior consistent with previous implementation:
print(f"[Preview] In '{repo_dir}': {full_cmd}") # stop on first failure and propagate return code as generic failure.
else: print(str(exc))
print(f"Running in '{repo_dir}': {full_cmd}") sys.exit(1)
result = subprocess.run(full_cmd, cwd=repo_dir, shell=True, check=False)
if result.returncode != 0:
print(
f"'git pull' for {repo_identifier} failed "
f"with exit code {result.returncode}."
)
sys.exit(result.returncode)

View File

@@ -1,105 +0,0 @@
from __future__ import annotations
import os
import subprocess
from pathlib import Path
from typing import Any, Dict, Optional
try:
from jinja2 import Environment, FileSystemLoader, StrictUndefined
except Exception as exc: # pragma: no cover
Environment = None # type: ignore[assignment]
FileSystemLoader = None # type: ignore[assignment]
StrictUndefined = None # type: ignore[assignment]
_JINJA_IMPORT_ERROR = exc
else:
_JINJA_IMPORT_ERROR = None
def _repo_root_from_here(anchor: Optional[Path] = None) -> str:
"""
Prefer git root (robust in editable installs / different layouts).
Fallback to a conservative relative parent lookup.
"""
here = (anchor or Path(__file__)).resolve().parent
try:
r = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
cwd=str(here),
check=False,
capture_output=True,
text=True,
)
if r.returncode == 0:
top = (r.stdout or "").strip()
if top:
return top
except Exception:
pass
# Fallback: src/pkgmgr/actions/repository/scaffold.py -> <repo root> = parents[5]
p = (anchor or Path(__file__)).resolve()
if len(p.parents) < 6:
raise RuntimeError(f"Unexpected path depth for: {p}")
return str(p.parents[5])
def _templates_dir() -> str:
return os.path.join(_repo_root_from_here(), "templates", "default")
def render_default_templates(
repo_dir: str,
*,
context: Dict[str, Any],
preview: bool,
) -> None:
"""
Render templates/default/*.j2 into repo_dir.
Keeps create.py clean: create.py calls this function only.
"""
tpl_dir = _templates_dir()
if not os.path.isdir(tpl_dir):
raise RuntimeError(f"Templates directory not found: {tpl_dir}")
# Preview mode: do not require Jinja2 at all. We only print planned outputs.
if preview:
for root, _, files in os.walk(tpl_dir):
for fn in files:
if not fn.endswith(".j2"):
continue
abs_src = os.path.join(root, fn)
rel_src = os.path.relpath(abs_src, tpl_dir)
rel_out = rel_src[:-3]
print(f"[Preview] Would render template: {rel_src} -> {rel_out}")
return
if Environment is None or FileSystemLoader is None or StrictUndefined is None:
raise RuntimeError(
"Jinja2 is required for repo templates but is not available. "
f"Import error: {_JINJA_IMPORT_ERROR}"
)
env = Environment(
loader=FileSystemLoader(tpl_dir),
undefined=StrictUndefined,
autoescape=False,
keep_trailing_newline=True,
)
for root, _, files in os.walk(tpl_dir):
for fn in files:
if not fn.endswith(".j2"):
continue
abs_src = os.path.join(root, fn)
rel_src = os.path.relpath(abs_src, tpl_dir)
rel_out = rel_src[:-3]
abs_out = os.path.join(repo_dir, rel_out)
os.makedirs(os.path.dirname(abs_out), exist_ok=True)
template = env.get_template(rel_src)
rendered = template.render(**context)
with open(abs_out, "w", encoding="utf-8") as f:
f.write(rendered)

View File

@@ -1,4 +1,3 @@
# src/pkgmgr/core/credentials/__init__.py
"""Credential resolution for provider APIs.""" """Credential resolution for provider APIs."""
from .resolver import ResolutionOptions, TokenResolver from .resolver import ResolutionOptions, TokenResolver

View File

@@ -3,9 +3,11 @@
from .env import EnvTokenProvider from .env import EnvTokenProvider
from .keyring import KeyringTokenProvider from .keyring import KeyringTokenProvider
from .prompt import PromptTokenProvider from .prompt import PromptTokenProvider
from .gh import GhTokenProvider
__all__ = [ __all__ = [
"EnvTokenProvider", "EnvTokenProvider",
"KeyringTokenProvider", "KeyringTokenProvider",
"PromptTokenProvider", "PromptTokenProvider",
"GhTokenProvider",
] ]

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
import shutil
import subprocess
from dataclasses import dataclass
from typing import Optional
from ..types import TokenRequest, TokenResult
@dataclass(frozen=True)
class GhTokenProvider:
"""
Resolve a GitHub token via GitHub CLI (`gh auth token`).
This does NOT persist anything; it only reads what `gh` already knows.
"""
source_name: str = "gh"
def get(self, request: TokenRequest) -> Optional[TokenResult]:
# Only meaningful for GitHub-like providers
kind = (request.provider_kind or "").strip().lower()
if kind not in ("github", "github.com"):
return None
if not shutil.which("gh"):
return None
host = (request.host or "").strip() or "github.com"
try:
out = subprocess.check_output(
["gh", "auth", "token", "--hostname", host],
stderr=subprocess.STDOUT,
text=True,
).strip()
except Exception:
return None
if not out:
return None
return TokenResult(token=out, source=self.source_name)

View File

@@ -6,9 +6,11 @@ from dataclasses import dataclass
from typing import Optional from typing import Optional
from .providers.env import EnvTokenProvider from .providers.env import EnvTokenProvider
from .providers.gh import GhTokenProvider
from .providers.keyring import KeyringTokenProvider from .providers.keyring import KeyringTokenProvider
from .providers.prompt import PromptTokenProvider from .providers.prompt import PromptTokenProvider
from .types import KeyringUnavailableError, NoCredentialsError, TokenRequest, TokenResult from .types import KeyringUnavailableError, NoCredentialsError, TokenRequest, TokenResult
from .validate import validate_token
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -21,10 +23,24 @@ class ResolutionOptions:
class TokenResolver: class TokenResolver:
"""Resolve tokens from multiple sources (ENV -> Keyring -> Prompt).""" """
Resolve tokens for provider APIs using the following policy:
0) ENV (explicit user intent) -> return as-is (do NOT persist)
1) GitHub CLI (gh) -> if available and token validates, return
2) Keyring -> if token validates, return; if invalid and
interactive prompting is allowed, prompt and
OVERWRITE the keyring entry
3) Prompt -> prompt and (optionally) store in keyring
Notes:
- Keyring requires python-keyring.
- Token validation is provider-specific (currently GitHub cloud).
"""
def __init__(self) -> None: def __init__(self) -> None:
self._env = EnvTokenProvider() self._env = EnvTokenProvider()
self._gh = GhTokenProvider()
self._keyring = KeyringTokenProvider() self._keyring = KeyringTokenProvider()
self._prompt = PromptTokenProvider() self._prompt = PromptTokenProvider()
self._warned_keyring: bool = False self._warned_keyring: bool = False
@@ -48,6 +64,33 @@ class TokenResolver:
print(" sudo dnf install python3-keyring", file=sys.stderr) print(" sudo dnf install python3-keyring", file=sys.stderr)
print("", file=sys.stderr) print("", file=sys.stderr)
def _prompt_and_maybe_store(
self,
request: TokenRequest,
opts: ResolutionOptions,
) -> Optional[TokenResult]:
"""
Prompt for a token and optionally store it in keyring.
If keyring is unavailable, still return the token for this run.
"""
if not (opts.interactive and opts.allow_prompt):
return None
prompt_res = self._prompt.get(request)
if not prompt_res:
return None
if opts.save_prompt_token_to_keyring:
try:
self._keyring.set(request, prompt_res.token) # overwrite is fine
except KeyringUnavailableError as exc:
self._warn_keyring_unavailable(exc)
except Exception:
# If keyring cannot store, still use token for this run.
pass
return prompt_res
def get_token( def get_token(
self, self,
provider_kind: str, provider_kind: str,
@@ -58,16 +101,29 @@ class TokenResolver:
opts = options or ResolutionOptions() opts = options or ResolutionOptions()
request = TokenRequest(provider_kind=provider_kind, host=host, owner=owner) request = TokenRequest(provider_kind=provider_kind, host=host, owner=owner)
# 1) ENV # 0) ENV (highest priority; explicit user intent)
env_res = self._env.get(request) env_res = self._env.get(request)
if env_res: if env_res:
# Do NOT validate or persist env tokens automatically.
return env_res return env_res
# 2) Keyring # 1) GitHub CLI (gh) (auto-read; validate)
gh_res = self._gh.get(request)
if gh_res and validate_token(request.provider_kind, request.host, gh_res.token):
return gh_res
# 2) Keyring (validate; if invalid -> prompt + overwrite)
try: try:
kr_res = self._keyring.get(request) kr_res = self._keyring.get(request)
if kr_res: if kr_res:
if validate_token(request.provider_kind, request.host, kr_res.token):
return kr_res return kr_res
# Token exists but seems invalid -> re-prompt and overwrite keyring.
renewed = self._prompt_and_maybe_store(request, opts)
if renewed:
return renewed
except KeyringUnavailableError as exc: except KeyringUnavailableError as exc:
# Show a helpful warning once, then continue (prompt fallback). # Show a helpful warning once, then continue (prompt fallback).
self._warn_keyring_unavailable(exc) self._warn_keyring_unavailable(exc)
@@ -76,21 +132,12 @@ class TokenResolver:
pass pass
# 3) Prompt (optional) # 3) Prompt (optional)
if opts.interactive and opts.allow_prompt: prompt_res = self._prompt_and_maybe_store(request, opts)
prompt_res = self._prompt.get(request)
if prompt_res: if prompt_res:
if opts.save_prompt_token_to_keyring:
try:
self._keyring.set(request, prompt_res.token)
except KeyringUnavailableError as exc:
self._warn_keyring_unavailable(exc)
except Exception:
# If keyring cannot store, still use token for this run.
pass
return prompt_res return prompt_res
raise NoCredentialsError( raise NoCredentialsError(
f"No token available for {provider_kind}@{host}" f"No token available for {provider_kind}@{host}"
+ (f" (owner: {owner})" if owner else "") + (f" (owner: {owner})" if owner else "")
+ ". Provide it via environment variable or keyring." + ". Provide it via environment variable, keyring, or gh auth."
) )

View File

@@ -44,6 +44,7 @@ def env_var_candidates(provider_kind: str, host: str, owner: Optional[str]) -> l
candidates.append(f"PKGMGR_{kind}_TOKEN") candidates.append(f"PKGMGR_{kind}_TOKEN")
candidates.append(f"PKGMGR_TOKEN_{kind}") candidates.append(f"PKGMGR_TOKEN_{kind}")
candidates.append("PKGMGR_TOKEN") candidates.append("PKGMGR_TOKEN")
return candidates return candidates

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
import urllib.request
import json
def validate_token(provider_kind: str, host: str, token: str) -> bool:
"""
Return True if token appears valid for the provider.
Currently implemented for GitHub only.
"""
kind = (provider_kind or "").strip().lower()
host = (host or "").strip() or "github.com"
token = (token or "").strip()
if not token:
return False
if kind in ("github", "github.com") and host.lower() == "github.com":
req = urllib.request.Request(
"https://api.github.com/user",
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github+json",
"User-Agent": "pkgmgr",
},
method="GET",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
if resp.status != 200:
return False
# Optional: parse to ensure body is JSON
_ = json.loads(resp.read().decode("utf-8"))
return True
except Exception:
return False
# Unknown provider: don't hard-fail validation (conservative default)
# If you prefer strictness: return False here.
return True

View File

@@ -1,5 +1,8 @@
from __future__ import annotations from __future__ import annotations
from .errors import GitError
from .run import run
""" """
Lightweight helper functions around Git commands. Lightweight helper functions around Git commands.
@@ -8,10 +11,7 @@ logic (release, version, changelog) does not have to deal with the
details of subprocess handling. details of subprocess handling.
""" """
from .errors import GitError
from .run import run
__all__ = [ __all__ = [
"GitError", "GitError",
"run" "run",
] ]

View File

@@ -1,25 +1,40 @@
# src/pkgmgr/core/git/commands/__init__.py
from __future__ import annotations from __future__ import annotations
from .add import GitAddError, add
from .add_all import GitAddAllError, add_all
from .add_remote import GitAddRemoteError, add_remote
from .add_remote_push_url import GitAddRemotePushUrlError, add_remote_push_url
from .branch_move import GitBranchMoveError, branch_move
from .checkout import GitCheckoutError, checkout from .checkout import GitCheckoutError, checkout
from .clone import GitCloneError, clone
from .commit import GitCommitError, commit
from .create_branch import GitCreateBranchError, create_branch
from .delete_local_branch import GitDeleteLocalBranchError, delete_local_branch from .delete_local_branch import GitDeleteLocalBranchError, delete_local_branch
from .delete_remote_branch import GitDeleteRemoteBranchError, delete_remote_branch from .delete_remote_branch import GitDeleteRemoteBranchError, delete_remote_branch
from .fetch import GitFetchError, fetch from .fetch import GitFetchError, fetch
from .init import GitInitError, init
from .merge_no_ff import GitMergeError, merge_no_ff from .merge_no_ff import GitMergeError, merge_no_ff
from .pull import GitPullError, pull from .pull import GitPullError, pull
from .pull_args import GitPullArgsError, pull_args # <-- add
from .pull_ff_only import GitPullFfOnlyError, pull_ff_only
from .push import GitPushError, push from .push import GitPushError, push
from .create_branch import GitCreateBranchError, create_branch
from .push_upstream import GitPushUpstreamError, push_upstream from .push_upstream import GitPushUpstreamError, push_upstream
from .add_remote import GitAddRemoteError, add_remote
from .set_remote_url import GitSetRemoteUrlError, set_remote_url from .set_remote_url import GitSetRemoteUrlError, set_remote_url
from .add_remote_push_url import GitAddRemotePushUrlError, add_remote_push_url from .tag_annotated import GitTagAnnotatedError, tag_annotated
from .tag_force_annotated import GitTagForceAnnotatedError, tag_force_annotated
__all__ = [ __all__ = [
"add",
"add_all",
"fetch", "fetch",
"checkout", "checkout",
"pull", "pull",
"pull_args", # <-- add
"pull_ff_only",
"merge_no_ff", "merge_no_ff",
"push", "push",
"commit",
"delete_local_branch", "delete_local_branch",
"delete_remote_branch", "delete_remote_branch",
"create_branch", "create_branch",
@@ -27,11 +42,21 @@ __all__ = [
"add_remote", "add_remote",
"set_remote_url", "set_remote_url",
"add_remote_push_url", "add_remote_push_url",
"tag_annotated",
"tag_force_annotated",
"clone",
"init",
"branch_move",
"GitAddError",
"GitAddAllError",
"GitFetchError", "GitFetchError",
"GitCheckoutError", "GitCheckoutError",
"GitPullError", "GitPullError",
"GitPullArgsError", # <-- add
"GitPullFfOnlyError",
"GitMergeError", "GitMergeError",
"GitPushError", "GitPushError",
"GitCommitError",
"GitDeleteLocalBranchError", "GitDeleteLocalBranchError",
"GitDeleteRemoteBranchError", "GitDeleteRemoteBranchError",
"GitCreateBranchError", "GitCreateBranchError",
@@ -39,4 +64,9 @@ __all__ = [
"GitAddRemoteError", "GitAddRemoteError",
"GitSetRemoteUrlError", "GitSetRemoteUrlError",
"GitAddRemotePushUrlError", "GitAddRemotePushUrlError",
"GitTagAnnotatedError",
"GitTagForceAnnotatedError",
"GitCloneError",
"GitInitError",
"GitBranchMoveError",
] ]

View File

@@ -0,0 +1,44 @@
from __future__ import annotations
from typing import Iterable, List, Sequence, Union
from ..errors import GitError, GitCommandError
from ..run import run
class GitAddError(GitCommandError):
"""Raised when `git add` fails."""
PathLike = Union[str, Sequence[str], Iterable[str]]
def _normalize_paths(paths: PathLike) -> List[str]:
if isinstance(paths, str):
return [paths]
return [p for p in paths]
def add(
paths: PathLike,
*,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Stage one or multiple paths.
Equivalent to:
git add <path...>
"""
normalized = _normalize_paths(paths)
if not normalized:
return
try:
run(["add", *normalized], cwd=cwd, preview=preview)
except GitError as exc:
raise GitAddError(
f"Failed to add paths to staging area: {normalized!r}.",
cwd=cwd,
) from exc

View File

@@ -0,0 +1,22 @@
# src/pkgmgr/core/git/commands/add_all.py
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitAddAllError(GitCommandError):
"""Raised when `git add -A` fails."""
def add_all(*, cwd: str = ".", preview: bool = False) -> None:
"""
Stage all changes (tracked + untracked).
Equivalent to:
git add -A
"""
try:
run(["add", "-A"], cwd=cwd, preview=preview)
except GitError as exc:
raise GitAddAllError("Failed to stage all changes with `git add -A`.", cwd=cwd) from exc

View File

@@ -0,0 +1,22 @@
# src/pkgmgr/core/git/commands/branch_move.py
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitBranchMoveError(GitCommandError):
"""Raised when renaming/moving a branch fails."""
def branch_move(branch: str, *, cwd: str = ".", preview: bool = False) -> None:
"""
Rename the current branch to `branch`, creating it if needed.
Equivalent to:
git branch -M <branch>
"""
try:
run(["branch", "-M", branch], cwd=cwd, preview=preview)
except GitError as exc:
raise GitBranchMoveError(f"Failed to move/rename current branch to {branch!r}.", cwd=cwd) from exc

View File

@@ -0,0 +1,32 @@
from __future__ import annotations
from typing import List
from ..errors import GitError, GitCommandError
from ..run import run
class GitCloneError(GitCommandError):
"""Raised when `git clone` fails."""
def clone(
args: List[str],
*,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Execute `git clone` with caller-provided arguments.
Examples:
["https://example.com/repo.git", "/path/to/dir"]
["--depth", "1", "--single-branch", url, dest]
"""
try:
run(["clone", *args], cwd=cwd, preview=preview)
except GitError as exc:
raise GitCloneError(
f"Git clone failed with args={args!r}.",
cwd=cwd,
) from exc

View File

@@ -0,0 +1,37 @@
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitCommitError(GitCommandError):
"""Raised when `git commit` fails."""
def commit(
message: str,
*,
cwd: str = ".",
all: bool = False,
preview: bool = False,
) -> None:
"""
Create a commit.
Equivalent to:
git commit -m "<message>"
or (if all=True):
git commit -am "<message>"
"""
args = ["commit"]
if all:
args.append("-a")
args += ["-m", message]
try:
run(args, cwd=cwd, preview=preview)
except GitError as exc:
raise GitCommitError(
"Failed to create commit.",
cwd=cwd,
) from exc

View File

@@ -8,9 +8,31 @@ class GitFetchError(GitCommandError):
"""Raised when fetching from a remote fails.""" """Raised when fetching from a remote fails."""
def fetch(remote: str = "origin", cwd: str = ".") -> None: def fetch(
remote: str = "origin",
*,
prune: bool = False,
tags: bool = False,
force: bool = False,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Fetch from a remote, optionally with prune/tags/force.
Equivalent to:
git fetch <remote> [--prune] [--tags] [--force]
"""
args = ["fetch", remote]
if prune:
args.append("--prune")
if tags:
args.append("--tags")
if force:
args.append("--force")
try: try:
run(["fetch", remote], cwd=cwd) run(args, cwd=cwd, preview=preview)
except GitError as exc: except GitError as exc:
raise GitFetchError( raise GitFetchError(
f"Failed to fetch from remote {remote!r}.", f"Failed to fetch from remote {remote!r}.",

View File

@@ -0,0 +1,22 @@
# src/pkgmgr/core/git/commands/init.py
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitInitError(GitCommandError):
"""Raised when `git init` fails."""
def init(*, cwd: str = ".", preview: bool = False) -> None:
"""
Initialize a repository.
Equivalent to:
git init
"""
try:
run(["init"], cwd=cwd, preview=preview)
except GitError as exc:
raise GitInitError("Failed to initialize git repository.", cwd=cwd) from exc

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from typing import List
from ..errors import GitError, GitCommandError
from ..run import run
class GitPullArgsError(GitCommandError):
"""Raised when `git pull` with arbitrary args fails."""
def pull_args(
args: List[str] | None = None,
*,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Execute `git pull` with caller-provided arguments.
Examples:
[] -> git pull
["--ff-only"] -> git pull --ff-only
["--rebase"] -> git pull --rebase
["origin", "main"] -> git pull origin main
"""
extra = args or []
try:
run(["pull", *extra], cwd=cwd, preview=preview)
except GitError as exc:
raise GitPullArgsError(
f"Failed to run `git pull` with args={extra!r}.",
cwd=cwd,
) from exc

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitPullFfOnlyError(GitCommandError):
"""Raised when pulling with --ff-only fails."""
def pull_ff_only(*, cwd: str = ".", preview: bool = False) -> None:
"""
Pull using fast-forward only.
Equivalent to:
git pull --ff-only
"""
try:
run(["pull", "--ff-only"], cwd=cwd, preview=preview)
except GitError as exc:
raise GitPullFfOnlyError(
"Failed to pull with --ff-only.",
cwd=cwd,
) from exc

View File

@@ -8,9 +8,26 @@ class GitPushError(GitCommandError):
"""Raised when pushing to a remote fails.""" """Raised when pushing to a remote fails."""
def push(remote: str, ref: str, cwd: str = ".") -> None: def push(
remote: str,
ref: str,
*,
force: bool = False,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Push a ref to a remote, optionally forced.
Equivalent to:
git push <remote> <ref> [--force]
"""
args = ["push", remote, ref]
if force:
args.append("--force")
try: try:
run(["push", remote, ref], cwd=cwd) run(args, cwd=cwd, preview=preview)
except GitError as exc: except GitError as exc:
raise GitPushError( raise GitPushError(
f"Failed to push ref {ref!r} to remote {remote!r}.", f"Failed to push ref {ref!r} to remote {remote!r}.",

View File

@@ -1,3 +1,4 @@
# src/pkgmgr/core/git/commands/push_upstream.py
from __future__ import annotations from __future__ import annotations
from ..errors import GitError, GitCommandError from ..errors import GitError, GitCommandError
@@ -8,14 +9,21 @@ class GitPushUpstreamError(GitCommandError):
"""Raised when pushing a branch with upstream tracking fails.""" """Raised when pushing a branch with upstream tracking fails."""
def push_upstream(remote: str, branch: str, cwd: str = ".") -> None: def push_upstream(
remote: str,
branch: str,
*,
cwd: str = ".",
preview: bool = False,
) -> None:
""" """
Push a branch and set upstream tracking. Push a branch and set upstream tracking.
Equivalent to: git push -u <remote> <branch> Equivalent to:
git push -u <remote> <branch>
""" """
try: try:
run(["push", "-u", remote, branch], cwd=cwd) run(["push", "-u", remote, branch], cwd=cwd, preview=preview)
except GitError as exc: except GitError as exc:
raise GitPushUpstreamError( raise GitPushUpstreamError(
f"Failed to push branch {branch!r} to {remote!r} with upstream tracking.", f"Failed to push branch {branch!r} to {remote!r} with upstream tracking.",

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitTagAnnotatedError(GitCommandError):
"""Raised when creating an annotated tag fails."""
def tag_annotated(
tag: str,
message: str,
*,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Create an annotated tag.
Equivalent to:
git tag -a <tag> -m "<message>"
"""
try:
run(["tag", "-a", tag, "-m", message], cwd=cwd, preview=preview)
except GitError as exc:
raise GitTagAnnotatedError(
f"Failed to create annotated tag {tag!r}.",
cwd=cwd,
) from exc

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
from ..errors import GitError, GitCommandError
from ..run import run
class GitTagForceAnnotatedError(GitCommandError):
"""Raised when forcing an annotated tag fails."""
def tag_force_annotated(
name: str,
target: str,
message: str,
*,
cwd: str = ".",
preview: bool = False,
) -> None:
"""
Force-create an annotated tag pointing at a given target.
Equivalent to:
git tag -f -a <name> <target> -m "<message>"
"""
try:
run(["tag", "-f", "-a", name, target, "-m", message], cwd=cwd, preview=preview)
except GitError as exc:
raise GitTagForceAnnotatedError(
f"Failed to force annotated tag {name!r} at {target!r}.",
cwd=cwd,
) from exc

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from .get_current_branch import get_current_branch from .get_current_branch import get_current_branch
from .get_head_commit import get_head_commit from .get_head_commit import get_head_commit
from .get_latest_commit import get_latest_commit
from .get_tags import get_tags from .get_tags import get_tags
from .resolve_base_branch import GitBaseBranchNotFoundError, resolve_base_branch from .resolve_base_branch import GitBaseBranchNotFoundError, resolve_base_branch
from .list_remotes import list_remotes from .list_remotes import list_remotes
@@ -9,10 +10,15 @@ from .get_remote_push_urls import get_remote_push_urls
from .probe_remote_reachable import probe_remote_reachable from .probe_remote_reachable import probe_remote_reachable
from .get_changelog import get_changelog, GitChangelogQueryError from .get_changelog import get_changelog, GitChangelogQueryError
from .get_tags_at_ref import get_tags_at_ref, GitTagsAtRefQueryError from .get_tags_at_ref import get_tags_at_ref, GitTagsAtRefQueryError
from .get_config_value import get_config_value
from .get_upstream_ref import get_upstream_ref
from .list_tags import list_tags
from .get_repo_root import get_repo_root
__all__ = [ __all__ = [
"get_current_branch", "get_current_branch",
"get_head_commit", "get_head_commit",
"get_latest_commit",
"get_tags", "get_tags",
"resolve_base_branch", "resolve_base_branch",
"GitBaseBranchNotFoundError", "GitBaseBranchNotFoundError",
@@ -23,4 +29,8 @@ __all__ = [
"GitChangelogQueryError", "GitChangelogQueryError",
"get_tags_at_ref", "get_tags_at_ref",
"GitTagsAtRefQueryError", "GitTagsAtRefQueryError",
"get_config_value",
"get_upstream_ref",
"list_tags",
"get_repo_root",
] ]

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from typing import Optional
from ..errors import GitError
from ..run import run
def _is_missing_key_error(exc: GitError) -> bool:
msg = str(exc).lower()
# Ensure we only swallow the expected case for THIS command.
if "git config --get" not in msg:
return False
# 'git config --get' returns exit code 1 when the key is not set.
return "exit code: 1" in msg
def get_config_value(key: str, *, cwd: str = ".") -> Optional[str]:
"""
Return a value from `git config --get <key>`, or None if not set.
We keep core.git.run() strict (check=True) and interpret the known
'not set' exit-code case here.
"""
try:
output = run(["config", "--get", key], cwd=cwd)
except GitError as exc:
if _is_missing_key_error(exc):
return None
raise
output = output.strip()
return output or None

View File

@@ -0,0 +1,26 @@
from __future__ import annotations
from typing import Optional
from ..errors import GitError
from ..run import run
def get_latest_commit(cwd: str = ".") -> Optional[str]:
"""
Return the latest commit hash for the repository in `cwd`.
Equivalent to:
git log -1 --format=%H
Returns:
The commit hash string, or None if it cannot be determined
(e.g. not a git repo, no commits, or other git failure).
"""
try:
output = run(["log", "-1", "--format=%H"], cwd=cwd)
except GitError:
return None
output = output.strip()
return output or None

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
from typing import Set from typing import Set
from ..errors import GitError
from ..run import run from ..run import run

View File

@@ -0,0 +1,23 @@
# src/pkgmgr/core/git/queries/get_repo_root.py
from __future__ import annotations
from typing import Optional
from ..errors import GitError
from ..run import run
def get_repo_root(*, cwd: str = ".") -> Optional[str]:
"""
Return the git repository root directory (top-level), or None if not available.
Equivalent to:
git rev-parse --show-toplevel
"""
try:
out = run(["rev-parse", "--show-toplevel"], cwd=cwd)
except GitError:
return None
out = out.strip()
return out or None

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from typing import Optional
from ..errors import GitError
from ..run import run
def get_upstream_ref(*, cwd: str = ".") -> Optional[str]:
"""
Return the configured upstream ref for the current branch, or None if none.
Equivalent to:
git rev-parse --abbrev-ref --symbolic-full-name @{u}
"""
try:
out = run(
["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
cwd=cwd,
)
except GitError:
return None
out = out.strip()
return out or None

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
from typing import List from typing import List
from ..errors import GitError
from ..run import run from ..run import run

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from typing import List
from ..run import run
def list_tags(pattern: str = "*", *, cwd: str = ".") -> List[str]:
"""
List tags matching a pattern.
Equivalent to:
git tag --list <pattern>
"""
out = run(["tag", "--list", pattern], cwd=cwd)
if not out:
return []
return [line.strip() for line in out.splitlines() if line.strip()]

View File

@@ -1,42 +0,0 @@
from __future__ import annotations
import io
import unittest
from contextlib import redirect_stdout
from unittest.mock import patch
from pkgmgr.actions.repository.create import create_repo
class TestE2ECreateRepoPreviewOutput(unittest.TestCase):
def test_create_repo_preview_prints_expected_steps(self) -> None:
cfg = {"directories": {"repositories": "/tmp/Repositories"}, "repositories": []}
out = io.StringIO()
with (
redirect_stdout(out),
patch("pkgmgr.actions.repository.create.os.path.exists", return_value=False),
patch("pkgmgr.actions.repository.create.generate_alias", return_value="repo"),
patch("pkgmgr.actions.repository.create.save_user_config"),
patch("pkgmgr.actions.repository.create.os.makedirs"),
patch("pkgmgr.actions.repository.create.render_default_templates"),
patch("pkgmgr.actions.repository.create.write_mirrors_file"),
patch("pkgmgr.actions.repository.create.setup_mirrors"),
patch("pkgmgr.actions.repository.create.subprocess.run"),
):
create_repo(
"github.com/acme/repo",
cfg,
"/tmp/user.yml",
"/tmp/bin",
remote=False,
preview=True,
)
s = out.getvalue()
self.assertIn("[Preview] Would save user config:", s)
self.assertIn("[Preview] Would ensure directory exists:", s)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,75 @@
from __future__ import annotations
import io
import unittest
from contextlib import redirect_stdout
from unittest.mock import patch
from pkgmgr.actions.repository.create import create_repo
class TestCreateRepoPreviewOutput(unittest.TestCase):
def test_create_repo_preview_prints_expected_steps(self) -> None:
cfg = {"directories": {"repositories": "/tmp/Repositories"}, "repositories": []}
out = io.StringIO()
with (
redirect_stdout(out),
patch(
"pkgmgr.actions.repository.create.config_writer.generate_alias",
return_value="repo",
),
patch(
"pkgmgr.actions.repository.create.config_writer.save_user_config",
),
patch(
"pkgmgr.actions.repository.create.config_writer.os.path.exists",
return_value=False,
),
patch(
"pkgmgr.actions.repository.create.service.os.makedirs",
),
patch(
"pkgmgr.actions.repository.create.templates.TemplateRenderer._resolve_templates_dir",
return_value="/tpl",
),
patch(
"pkgmgr.actions.repository.create.templates.os.walk",
return_value=[("/tpl", [], ["README.md.j2"])],
),
patch(
"pkgmgr.actions.repository.create.git_bootstrap.init",
),
patch(
"pkgmgr.actions.repository.create.git_bootstrap.add_all",
),
patch(
"pkgmgr.actions.repository.create.git_bootstrap.commit",
),
patch(
"pkgmgr.actions.repository.create.mirrors.write_mirrors_file",
),
patch(
"pkgmgr.actions.repository.create.mirrors.setup_mirrors",
),
patch(
"pkgmgr.actions.repository.create.service.get_config_value",
return_value=None,
),
):
create_repo(
"github.com/acme/repo",
cfg,
"/tmp/user.yml",
"/tmp/bin",
remote=False,
preview=True,
)
s = out.getvalue()
self.assertIn("[Preview] Would add repository to config:", s)
self.assertIn("[Preview] Would ensure directory exists:", s)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,115 @@
# tests/integration/test_repos_create_pypi_not_in_git_config.py
from __future__ import annotations
import os
import subprocess
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from pkgmgr.actions.repository.create import create_repo
class TestCreateRepoPypiNotInGitConfig(unittest.TestCase):
def test_create_repo_writes_pypi_to_mirrors_but_not_git_config(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
# Repositories base dir used by create flow
repos_base = tmp_path / "Repositories"
user_cfg = tmp_path / "user.yml"
bin_dir = tmp_path / "bin"
bin_dir.mkdir(parents=True, exist_ok=True)
cfg = {
"directories": {"repositories": str(repos_base)},
"repositories": [],
}
# Provide a minimal templates directory so TemplateRenderer can run
tpl_dir = tmp_path / "tpl"
tpl_dir.mkdir(parents=True, exist_ok=True)
(tpl_dir / "README.md.j2").write_text(
"# {{ repository }}\n", encoding="utf-8"
)
# Expected repo dir for identifier github.com/acme/repo
repo_dir = repos_base / "github.com" / "acme" / "repo"
with (
# Avoid any real network calls during mirror "remote probing"
patch(
"pkgmgr.actions.mirror.setup_cmd.probe_remote_reachable",
return_value=True,
),
# Force templates to come from our temp directory
patch(
"pkgmgr.actions.repository.create.templates.TemplateRenderer._resolve_templates_dir",
return_value=str(tpl_dir),
),
# Make git commit deterministic without depending on global git config
patch.dict(
os.environ,
{
"GIT_AUTHOR_NAME": "Test Author",
"GIT_AUTHOR_EMAIL": "author@example.invalid",
"GIT_COMMITTER_NAME": "Test Author",
"GIT_COMMITTER_EMAIL": "author@example.invalid",
},
clear=False,
),
):
create_repo(
"github.com/acme/repo",
cfg,
str(user_cfg),
str(bin_dir),
remote=False,
preview=False,
)
# --- Assertions: MIRRORS file ---
mirrors_file = repo_dir / "MIRRORS"
self.assertTrue(mirrors_file.exists(), "MIRRORS file was not created")
mirrors_content = mirrors_file.read_text(encoding="utf-8")
self.assertIn(
"pypi https://pypi.org/project/repo/",
mirrors_content,
"PyPI mirror entry must exist in MIRRORS",
)
self.assertIn(
"origin git@github.com:acme/repo.git",
mirrors_content,
"origin SSH URL must exist in MIRRORS",
)
# --- Assertions: git config must NOT contain PyPI ---
git_config = repo_dir / ".git" / "config"
self.assertTrue(git_config.exists(), ".git/config was not created")
git_config_content = git_config.read_text(encoding="utf-8")
self.assertNotIn(
"pypi.org/project",
git_config_content,
"PyPI must never be written into git config",
)
# --- Assertions: origin remote exists and points to SSH ---
remotes = subprocess.check_output(
["git", "-C", str(repo_dir), "remote"],
text=True,
).splitlines()
self.assertIn("origin", remotes, "origin remote was not created")
remote_v = subprocess.check_output(
["git", "-C", str(repo_dir), "remote", "-v"],
text=True,
)
self.assertIn("git@github.com:acme/repo.git", remote_v)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,106 @@
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.core.credentials.resolver import TokenResolver
from pkgmgr.core.credentials.types import TokenResult
class TestTokenResolverIntegration(unittest.TestCase):
def test_full_resolution_flow_with_invalid_gh_and_keyring_then_prompt(self) -> None:
"""
Full integration scenario:
- ENV provides nothing
- GitHub CLI (gh) is available and returns a token, but it is INVALID
- Keyring contains a token, but it is INVALID
- Interactive prompt provides a NEW token
- New token is ACCEPTED and OVERWRITES the keyring entry
"""
resolver = TokenResolver()
# ------------------------------------------------------------------
# 1) ENV: empty
# ------------------------------------------------------------------
with patch.dict("os.environ", {}, clear=True):
# ------------------------------------------------------------------
# 2) GH CLI is available
# ------------------------------------------------------------------
with patch(
"pkgmgr.core.credentials.providers.gh.shutil.which",
return_value="/usr/bin/gh",
):
with patch(
"pkgmgr.core.credentials.providers.gh.subprocess.check_output",
return_value="gh-invalid-token\n",
):
# ------------------------------------------------------------------
# 3) Keyring returns an existing (invalid) token
# ------------------------------------------------------------------
with patch(
"pkgmgr.core.credentials.providers.keyring._import_keyring"
) as mock_import_keyring:
mock_keyring = mock_import_keyring.return_value
mock_keyring.get_password.return_value = "keyring-invalid-token"
# ------------------------------------------------------------------
# 4) Prompt is allowed and returns a NEW token
# ------------------------------------------------------------------
with patch(
"pkgmgr.core.credentials.providers.prompt.sys.stdin.isatty",
return_value=True,
):
with patch(
"pkgmgr.core.credentials.providers.prompt.getpass",
return_value="new-valid-token",
):
# ------------------------------------------------------------------
# 5) Validation logic:
# - gh token invalid
# - keyring token invalid
# - prompt token is NOT validated (by design)
# ------------------------------------------------------------------
def validate_side_effect(
provider_kind: str,
host: str,
token: str,
) -> bool:
return False # gh + keyring invalid
with patch(
"pkgmgr.core.credentials.resolver.validate_token",
side_effect=validate_side_effect,
) as validate_mock:
result = resolver.get_token(
provider_kind="github",
host="github.com",
)
# ----------------------------------------------------------------------
# Assertions
# ----------------------------------------------------------------------
self.assertIsInstance(result, TokenResult)
self.assertEqual(result.token, "new-valid-token")
self.assertEqual(result.source, "prompt")
# validate_token was called ONLY for gh and keyring
validated_tokens = [call.args[2] for call in validate_mock.call_args_list]
self.assertIn("gh-invalid-token", validated_tokens)
self.assertIn("keyring-invalid-token", validated_tokens)
self.assertNotIn("new-valid-token", validated_tokens)
# Keyring must be overwritten with the new token
mock_keyring.set_password.assert_called_once()
service, username, stored_token = mock_keyring.set_password.call_args.args
self.assertEqual(stored_token, "new-valid-token")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,198 +0,0 @@
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.core.git import GitError
from pkgmgr.actions.release.git_ops import (
ensure_clean_and_synced,
is_highest_version_tag,
run_git_command,
update_latest_tag,
)
class TestRunGitCommand(unittest.TestCase):
@patch("pkgmgr.actions.release.git_ops.subprocess.run")
def test_run_git_command_success(self, mock_run) -> None:
run_git_command("git status")
mock_run.assert_called_once()
args, kwargs = mock_run.call_args
self.assertIn("git status", args[0])
self.assertTrue(kwargs.get("check"))
self.assertTrue(kwargs.get("capture_output"))
self.assertTrue(kwargs.get("text"))
@patch("pkgmgr.actions.release.git_ops.subprocess.run")
def test_run_git_command_failure_raises_git_error(self, mock_run) -> None:
from subprocess import CalledProcessError
mock_run.side_effect = CalledProcessError(
returncode=1,
cmd="git status",
output="stdout",
stderr="stderr",
)
with self.assertRaises(GitError):
run_git_command("git status")
class TestEnsureCleanAndSynced(unittest.TestCase):
def _fake_run(self, cmd: str, *args, **kwargs):
class R:
def __init__(self, stdout: str = "", stderr: str = "", returncode: int = 0):
self.stdout = stdout
self.stderr = stderr
self.returncode = returncode
# upstream detection
if "git rev-parse --abbrev-ref --symbolic-full-name @{u}" in cmd:
return R(stdout="origin/main")
# fetch/pull should be invoked in real mode
if cmd == "git fetch --prune --tags":
return R(stdout="")
if cmd == "git pull --ff-only":
return R(stdout="Already up to date.")
return R(stdout="")
@patch("pkgmgr.actions.release.git_ops.subprocess.run")
def test_ensure_clean_and_synced_preview_does_not_run_git_commands(self, mock_run) -> None:
def fake(cmd: str, *args, **kwargs):
class R:
def __init__(self, stdout: str = ""):
self.stdout = stdout
self.stderr = ""
self.returncode = 0
if "git rev-parse --abbrev-ref --symbolic-full-name @{u}" in cmd:
return R(stdout="origin/main")
return R(stdout="")
mock_run.side_effect = fake
ensure_clean_and_synced(preview=True)
called_cmds = [c.args[0] for c in mock_run.call_args_list]
self.assertTrue(any("git rev-parse" in c for c in called_cmds))
self.assertFalse(any(c == "git fetch --prune --tags" for c in called_cmds))
self.assertFalse(any(c == "git pull --ff-only" for c in called_cmds))
@patch("pkgmgr.actions.release.git_ops.subprocess.run")
def test_ensure_clean_and_synced_no_upstream_skips(self, mock_run) -> None:
def fake(cmd: str, *args, **kwargs):
class R:
def __init__(self, stdout: str = ""):
self.stdout = stdout
self.stderr = ""
self.returncode = 0
if "git rev-parse --abbrev-ref --symbolic-full-name @{u}" in cmd:
return R(stdout="") # no upstream
return R(stdout="")
mock_run.side_effect = fake
ensure_clean_and_synced(preview=False)
called_cmds = [c.args[0] for c in mock_run.call_args_list]
self.assertTrue(any("git rev-parse" in c for c in called_cmds))
self.assertFalse(any(c == "git fetch --prune --tags" for c in called_cmds))
self.assertFalse(any(c == "git pull --ff-only" for c in called_cmds))
@patch("pkgmgr.actions.release.git_ops.subprocess.run")
def test_ensure_clean_and_synced_real_runs_fetch_and_pull(self, mock_run) -> None:
mock_run.side_effect = self._fake_run
ensure_clean_and_synced(preview=False)
called_cmds = [c.args[0] for c in mock_run.call_args_list]
self.assertIn("git fetch origin --prune --tags --force", called_cmds)
self.assertIn("git pull --ff-only", called_cmds)
class TestIsHighestVersionTag(unittest.TestCase):
@patch("pkgmgr.actions.release.git_ops.subprocess.run")
def test_is_highest_version_tag_no_tags_true(self, mock_run) -> None:
def fake(cmd: str, *args, **kwargs):
class R:
def __init__(self, stdout: str = ""):
self.stdout = stdout
self.stderr = ""
self.returncode = 0
if "git tag --list" in cmd and "'v*'" in cmd:
return R(stdout="") # no tags
return R(stdout="")
mock_run.side_effect = fake
self.assertTrue(is_highest_version_tag("v1.0.0"))
# ensure at least the list command was queried
called_cmds = [c.args[0] for c in mock_run.call_args_list]
self.assertTrue(any("git tag --list" in c for c in called_cmds))
@patch("pkgmgr.actions.release.git_ops.subprocess.run")
def test_is_highest_version_tag_compares_sort_v(self, mock_run) -> None:
"""
This test is aligned with the CURRENT implementation:
return tag >= latest
which is a *string comparison*, not a semantic version compare.
Therefore, a candidate like v1.2.0 is lexicographically >= v1.10.0
(because '2' > '1' at the first differing char after 'v1.').
"""
def fake(cmd: str, *args, **kwargs):
class R:
def __init__(self, stdout: str = ""):
self.stdout = stdout
self.stderr = ""
self.returncode = 0
if cmd.strip() == "git tag --list 'v*'":
return R(stdout="v1.0.0\nv1.2.0\nv1.10.0\n")
if "git tag --list 'v*'" in cmd and "sort -V" in cmd and "tail -n1" in cmd:
return R(stdout="v1.10.0")
return R(stdout="")
mock_run.side_effect = fake
# With the current implementation (string >=), both of these are True.
self.assertTrue(is_highest_version_tag("v1.10.0"))
self.assertTrue(is_highest_version_tag("v1.2.0"))
# And a clearly lexicographically smaller candidate should be False.
# Example: "v1.0.0" < "v1.10.0"
self.assertFalse(is_highest_version_tag("v1.0.0"))
# Ensure both capture commands were executed
called_cmds = [c.args[0] for c in mock_run.call_args_list]
self.assertTrue(any(cmd == "git tag --list 'v*'" for cmd in called_cmds))
self.assertTrue(any("sort -V" in cmd and "tail -n1" in cmd for cmd in called_cmds))
class TestUpdateLatestTag(unittest.TestCase):
@patch("pkgmgr.actions.release.git_ops.run_git_command")
def test_update_latest_tag_preview_does_not_call_git(self, mock_run_git_command) -> None:
update_latest_tag("v1.2.3", preview=True)
mock_run_git_command.assert_not_called()
@patch("pkgmgr.actions.release.git_ops.run_git_command")
def test_update_latest_tag_real_calls_git(self, mock_run_git_command) -> None:
update_latest_tag("v1.2.3", preview=False)
calls = [c.args[0] for c in mock_run_git_command.call_args_list]
self.assertIn(
'git tag -f -a latest v1.2.3^{} -m "Floating latest tag for v1.2.3"',
calls,
)
self.assertIn("git push origin latest --force", calls)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.actions.release.git_ops import ensure_clean_and_synced
class TestEnsureCleanAndSynced(unittest.TestCase):
@patch("pkgmgr.actions.release.git_ops.pull_ff_only")
@patch("pkgmgr.actions.release.git_ops.fetch")
@patch("pkgmgr.actions.release.git_ops.get_upstream_ref")
def test_no_upstream_skips(
self,
mock_get_upstream_ref,
mock_fetch,
mock_pull_ff_only,
) -> None:
mock_get_upstream_ref.return_value = None
ensure_clean_and_synced(preview=False)
mock_fetch.assert_not_called()
mock_pull_ff_only.assert_not_called()
@patch("pkgmgr.actions.release.git_ops.pull_ff_only")
@patch("pkgmgr.actions.release.git_ops.fetch")
@patch("pkgmgr.actions.release.git_ops.get_upstream_ref")
def test_preview_calls_commands_with_preview_true(
self,
mock_get_upstream_ref,
mock_fetch,
mock_pull_ff_only,
) -> None:
mock_get_upstream_ref.return_value = "origin/main"
ensure_clean_and_synced(preview=True)
mock_fetch.assert_called_once_with(
remote="origin",
prune=True,
tags=True,
force=True,
preview=True,
)
mock_pull_ff_only.assert_called_once_with(preview=True)
@patch("pkgmgr.actions.release.git_ops.pull_ff_only")
@patch("pkgmgr.actions.release.git_ops.fetch")
@patch("pkgmgr.actions.release.git_ops.get_upstream_ref")
def test_real_calls_commands_with_preview_false(
self,
mock_get_upstream_ref,
mock_fetch,
mock_pull_ff_only,
) -> None:
mock_get_upstream_ref.return_value = "origin/main"
ensure_clean_and_synced(preview=False)
mock_fetch.assert_called_once_with(
remote="origin",
prune=True,
tags=True,
force=True,
preview=False,
)
mock_pull_ff_only.assert_called_once_with(preview=False)

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.actions.release.git_ops import is_highest_version_tag
class TestIsHighestVersionTag(unittest.TestCase):
@patch("pkgmgr.actions.release.git_ops.list_tags")
def test_no_tags_returns_true(self, mock_list_tags) -> None:
mock_list_tags.return_value = []
self.assertTrue(is_highest_version_tag("v1.0.0"))
mock_list_tags.assert_called_once_with("v*")
@patch("pkgmgr.actions.release.git_ops.list_tags")
def test_parseable_semver_compares_correctly(self, mock_list_tags) -> None:
# Highest is v1.10.0 (semantic compare)
mock_list_tags.return_value = ["v1.0.0", "v1.2.0", "v1.10.0"]
self.assertTrue(is_highest_version_tag("v1.10.0"))
self.assertFalse(is_highest_version_tag("v1.2.0"))
self.assertFalse(is_highest_version_tag("v1.0.0"))
@patch("pkgmgr.actions.release.git_ops.list_tags")
def test_ignores_non_parseable_v_tags_for_semver_compare(self, mock_list_tags) -> None:
mock_list_tags.return_value = ["v1.2.0", "v1.10.0", "v1.2.0-rc1", "vfoo"]
self.assertTrue(is_highest_version_tag("v1.10.0"))
self.assertFalse(is_highest_version_tag("v1.2.0"))
@patch("pkgmgr.actions.release.git_ops.list_tags")
def test_current_tag_not_parseable_falls_back_to_lex_compare(self, mock_list_tags) -> None:
mock_list_tags.return_value = ["v1.9.0", "v1.10.0"]
# prerelease must NOT outrank the final release
self.assertFalse(is_highest_version_tag("v1.10.0-rc1"))
self.assertFalse(is_highest_version_tag("v1.0.0-rc1"))

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.actions.release.git_ops import update_latest_tag
class TestUpdateLatestTag(unittest.TestCase):
@patch("pkgmgr.actions.release.git_ops.push")
@patch("pkgmgr.actions.release.git_ops.tag_force_annotated")
def test_preview_calls_commands_with_preview_true(
self,
mock_tag_force_annotated,
mock_push,
) -> None:
update_latest_tag("v1.2.3", preview=True)
mock_tag_force_annotated.assert_called_once_with(
name="latest",
target="v1.2.3^{}",
message="Floating latest tag for v1.2.3",
preview=True,
)
mock_push.assert_called_once_with(
"origin",
"latest",
force=True,
preview=True,
)
@patch("pkgmgr.actions.release.git_ops.push")
@patch("pkgmgr.actions.release.git_ops.tag_force_annotated")
def test_real_calls_commands_with_preview_false(
self,
mock_tag_force_annotated,
mock_push,
) -> None:
update_latest_tag("v1.2.3", preview=False)
mock_tag_force_annotated.assert_called_once_with(
name="latest",
target="v1.2.3^{}",
message="Floating latest tag for v1.2.3",
preview=False,
)
mock_push.assert_called_once_with(
"origin",
"latest",
force=True,
preview=False,
)

View File

@@ -1,23 +1,27 @@
# tests/test_clone_repos.py # tests/unit/pkgmgr/actions/repos/test_clone.py
from __future__ import annotations
import unittest import unittest
from unittest.mock import patch, MagicMock from unittest.mock import patch
from pkgmgr.actions.repository.clone import clone_repos from pkgmgr.actions.repository.clone import clone_repos
class TestCloneRepos(unittest.TestCase): class TestCloneRepos(unittest.TestCase):
def setUp(self): def setUp(self) -> None:
# Add `verified` so verify_repository() is actually exercised.
self.repo = { self.repo = {
"provider": "github.com", "provider": "github.com",
"account": "user", "account": "user",
"repository": "repo", "repository": "repo",
"verified": {"commit": "deadbeef"},
} }
self.selected = [self.repo] self.selected = [self.repo]
self.base_dir = "/tmp/repos" self.base_dir = "/tmp/repos"
self.all_repos = self.selected self.all_repos = self.selected
@patch("pkgmgr.actions.repository.clone.verify_repository") @patch("pkgmgr.actions.repository.clone.verify_repository")
@patch("pkgmgr.actions.repository.clone.subprocess.run") @patch("pkgmgr.actions.repository.clone.git_clone")
@patch("pkgmgr.actions.repository.clone.os.makedirs") @patch("pkgmgr.actions.repository.clone.os.makedirs")
@patch("pkgmgr.actions.repository.clone.os.path.exists") @patch("pkgmgr.actions.repository.clone.os.path.exists")
@patch("pkgmgr.actions.repository.clone.get_repo_dir") @patch("pkgmgr.actions.repository.clone.get_repo_dir")
@@ -28,13 +32,14 @@ class TestCloneRepos(unittest.TestCase):
mock_get_repo_dir, mock_get_repo_dir,
mock_exists, mock_exists,
mock_makedirs, mock_makedirs,
mock_run, mock_git_clone,
mock_verify, mock_verify,
): ) -> None:
mock_get_repo_identifier.return_value = "github.com/user/repo" mock_get_repo_identifier.return_value = "github.com/user/repo"
mock_get_repo_dir.return_value = "/tmp/repos/user/repo" mock_get_repo_dir.return_value = "/tmp/repos/user/repo"
mock_exists.return_value = False mock_exists.return_value = False
mock_run.return_value = MagicMock(returncode=0)
# verification called; and because no_verification=True, result doesn't matter
mock_verify.return_value = (True, [], "hash", "key") mock_verify.return_value = (True, [], "hash", "key")
clone_repos( clone_repos(
@@ -46,17 +51,26 @@ class TestCloneRepos(unittest.TestCase):
clone_mode="ssh", clone_mode="ssh",
) )
mock_run.assert_called_once() mock_git_clone.assert_called_once()
# subprocess.run wird mit positional args aufgerufen args, kwargs = mock_git_clone.call_args
cmd = mock_run.call_args[0][0] clone_args = args[0]
cwd = mock_run.call_args[1]["cwd"] self.assertEqual(
clone_args,
["git@github.com:user/repo.git", "/tmp/repos/user/repo"],
)
self.assertEqual(kwargs["cwd"], "/tmp/repos/user")
self.assertFalse(kwargs["preview"])
self.assertIn("git clone", cmd) # verify_repository should be called because repo has "verified"
self.assertIn("git@github.com:user/repo.git", cmd) mock_verify.assert_called_once()
self.assertEqual(cwd, "/tmp/repos/user") v_args, v_kwargs = mock_verify.call_args
self.assertEqual(v_args[0], self.repo) # repo dict
self.assertEqual(v_args[1], "/tmp/repos/user/repo") # repo_dir
self.assertEqual(v_kwargs["mode"], "local")
self.assertTrue(v_kwargs["no_verification"])
@patch("pkgmgr.actions.repository.clone.verify_repository") @patch("pkgmgr.actions.repository.clone.verify_repository")
@patch("pkgmgr.actions.repository.clone.subprocess.run") @patch("pkgmgr.actions.repository.clone.git_clone")
@patch("pkgmgr.actions.repository.clone.os.makedirs") @patch("pkgmgr.actions.repository.clone.os.makedirs")
@patch("pkgmgr.actions.repository.clone.os.path.exists") @patch("pkgmgr.actions.repository.clone.os.path.exists")
@patch("pkgmgr.actions.repository.clone.get_repo_dir") @patch("pkgmgr.actions.repository.clone.get_repo_dir")
@@ -67,13 +81,12 @@ class TestCloneRepos(unittest.TestCase):
mock_get_repo_dir, mock_get_repo_dir,
mock_exists, mock_exists,
mock_makedirs, mock_makedirs,
mock_run, mock_git_clone,
mock_verify, mock_verify,
): ) -> None:
mock_get_repo_identifier.return_value = "github.com/user/repo" mock_get_repo_identifier.return_value = "github.com/user/repo"
mock_get_repo_dir.return_value = "/tmp/repos/user/repo" mock_get_repo_dir.return_value = "/tmp/repos/user/repo"
mock_exists.return_value = False mock_exists.return_value = False
mock_run.return_value = MagicMock(returncode=0)
mock_verify.return_value = (True, [], "hash", "key") mock_verify.return_value = (True, [], "hash", "key")
clone_repos( clone_repos(
@@ -85,16 +98,20 @@ class TestCloneRepos(unittest.TestCase):
clone_mode="https", clone_mode="https",
) )
mock_run.assert_called_once() mock_git_clone.assert_called_once()
cmd = mock_run.call_args[0][0] args, kwargs = mock_git_clone.call_args
cwd = mock_run.call_args[1]["cwd"] clone_args = args[0]
self.assertEqual(
clone_args,
["https://github.com/user/repo.git", "/tmp/repos/user/repo"],
)
self.assertEqual(kwargs["cwd"], "/tmp/repos/user")
self.assertFalse(kwargs["preview"])
self.assertIn("git clone", cmd) mock_verify.assert_called_once()
self.assertIn("https://github.com/user/repo.git", cmd)
self.assertEqual(cwd, "/tmp/repos/user")
@patch("pkgmgr.actions.repository.clone.verify_repository") @patch("pkgmgr.actions.repository.clone.verify_repository")
@patch("pkgmgr.actions.repository.clone.subprocess.run") @patch("pkgmgr.actions.repository.clone.git_clone")
@patch("pkgmgr.actions.repository.clone.os.makedirs") @patch("pkgmgr.actions.repository.clone.os.makedirs")
@patch("pkgmgr.actions.repository.clone.os.path.exists") @patch("pkgmgr.actions.repository.clone.os.path.exists")
@patch("pkgmgr.actions.repository.clone.get_repo_dir") @patch("pkgmgr.actions.repository.clone.get_repo_dir")
@@ -105,13 +122,12 @@ class TestCloneRepos(unittest.TestCase):
mock_get_repo_dir, mock_get_repo_dir,
mock_exists, mock_exists,
mock_makedirs, mock_makedirs,
mock_run, mock_git_clone,
mock_verify, mock_verify,
): ) -> None:
mock_get_repo_identifier.return_value = "github.com/user/repo" mock_get_repo_identifier.return_value = "github.com/user/repo"
mock_get_repo_dir.return_value = "/tmp/repos/user/repo" mock_get_repo_dir.return_value = "/tmp/repos/user/repo"
mock_exists.return_value = False mock_exists.return_value = False
mock_run.return_value = MagicMock(returncode=0)
mock_verify.return_value = (True, [], "hash", "key") mock_verify.return_value = (True, [], "hash", "key")
clone_repos( clone_repos(
@@ -123,29 +139,39 @@ class TestCloneRepos(unittest.TestCase):
clone_mode="shallow", clone_mode="shallow",
) )
mock_run.assert_called_once() mock_git_clone.assert_called_once()
cmd = mock_run.call_args[0][0] args, kwargs = mock_git_clone.call_args
cwd = mock_run.call_args[1]["cwd"] clone_args = args[0]
self.assertEqual(
clone_args,
[
"--depth",
"1",
"--single-branch",
"https://github.com/user/repo.git",
"/tmp/repos/user/repo",
],
)
self.assertEqual(kwargs["cwd"], "/tmp/repos/user")
self.assertFalse(kwargs["preview"])
self.assertIn("git clone --depth 1 --single-branch", cmd) mock_verify.assert_called_once()
self.assertIn("https://github.com/user/repo.git", cmd)
self.assertEqual(cwd, "/tmp/repos/user")
@patch("pkgmgr.actions.repository.clone.verify_repository") @patch("pkgmgr.actions.repository.clone.verify_repository")
@patch("pkgmgr.actions.repository.clone.subprocess.run") @patch("pkgmgr.actions.repository.clone.git_clone")
@patch("pkgmgr.actions.repository.clone.os.makedirs") @patch("pkgmgr.actions.repository.clone.os.makedirs")
@patch("pkgmgr.actions.repository.clone.os.path.exists") @patch("pkgmgr.actions.repository.clone.os.path.exists")
@patch("pkgmgr.actions.repository.clone.get_repo_dir") @patch("pkgmgr.actions.repository.clone.get_repo_dir")
@patch("pkgmgr.actions.repository.clone.get_repo_identifier") @patch("pkgmgr.actions.repository.clone.get_repo_identifier")
def test_preview_mode_does_not_call_subprocess_run( def test_preview_mode_calls_git_clone_with_preview_true(
self, self,
mock_get_repo_identifier, mock_get_repo_identifier,
mock_get_repo_dir, mock_get_repo_dir,
mock_exists, mock_exists,
mock_makedirs, mock_makedirs,
mock_run, mock_git_clone,
mock_verify, mock_verify,
): ) -> None:
mock_get_repo_identifier.return_value = "github.com/user/repo" mock_get_repo_identifier.return_value = "github.com/user/repo"
mock_get_repo_dir.return_value = "/tmp/repos/user/repo" mock_get_repo_dir.return_value = "/tmp/repos/user/repo"
mock_exists.return_value = False mock_exists.return_value = False
@@ -160,8 +186,153 @@ class TestCloneRepos(unittest.TestCase):
clone_mode="shallow", clone_mode="shallow",
) )
# Im Preview-Modus sollte subprocess.run nicht aufgerufen werden mock_git_clone.assert_called_once()
mock_run.assert_not_called() _args, kwargs = mock_git_clone.call_args
self.assertTrue(kwargs["preview"])
# Even in preview, verification is reached (because repo has "verified"),
# but no_verification=True makes it non-blocking.
mock_verify.assert_called_once()
@patch("builtins.input", return_value="y")
@patch("pkgmgr.actions.repository.clone.verify_repository")
@patch("pkgmgr.actions.repository.clone.git_clone")
@patch("pkgmgr.actions.repository.clone.os.makedirs")
@patch("pkgmgr.actions.repository.clone.os.path.exists")
@patch("pkgmgr.actions.repository.clone.get_repo_dir")
@patch("pkgmgr.actions.repository.clone.get_repo_identifier")
def test_ssh_clone_failure_prompts_and_falls_back_to_https_when_confirmed(
self,
mock_get_repo_identifier,
mock_get_repo_dir,
mock_exists,
mock_makedirs,
mock_git_clone,
mock_verify,
mock_input,
) -> None:
mock_get_repo_identifier.return_value = "github.com/user/repo"
mock_get_repo_dir.return_value = "/tmp/repos/user/repo"
mock_exists.return_value = False
mock_verify.return_value = (True, [], "hash", "key")
# First call (ssh) fails, second call (https) succeeds
from pkgmgr.core.git.commands.clone import GitCloneError
mock_git_clone.side_effect = [
GitCloneError("ssh failed", cwd="/tmp/repos/user"),
None,
]
clone_repos(
self.selected,
self.base_dir,
self.all_repos,
preview=False,
no_verification=True,
clone_mode="ssh",
)
self.assertEqual(mock_git_clone.call_count, 2)
first_args, first_kwargs = mock_git_clone.call_args_list[0]
self.assertEqual(
first_args[0],
["git@github.com:user/repo.git", "/tmp/repos/user/repo"],
)
self.assertEqual(first_kwargs["cwd"], "/tmp/repos/user")
self.assertFalse(first_kwargs["preview"])
second_args, second_kwargs = mock_git_clone.call_args_list[1]
self.assertEqual(
second_args[0],
["https://github.com/user/repo.git", "/tmp/repos/user/repo"],
)
self.assertEqual(second_kwargs["cwd"], "/tmp/repos/user")
self.assertFalse(second_kwargs["preview"])
mock_input.assert_called_once()
mock_verify.assert_called_once()
@patch("builtins.input", return_value="n")
@patch("pkgmgr.actions.repository.clone.verify_repository")
@patch("pkgmgr.actions.repository.clone.git_clone")
@patch("pkgmgr.actions.repository.clone.os.makedirs")
@patch("pkgmgr.actions.repository.clone.os.path.exists")
@patch("pkgmgr.actions.repository.clone.get_repo_dir")
@patch("pkgmgr.actions.repository.clone.get_repo_identifier")
def test_ssh_clone_failure_does_not_fallback_when_declined(
self,
mock_get_repo_identifier,
mock_get_repo_dir,
mock_exists,
mock_makedirs,
mock_git_clone,
mock_verify,
mock_input,
) -> None:
mock_get_repo_identifier.return_value = "github.com/user/repo"
mock_get_repo_dir.return_value = "/tmp/repos/user/repo"
mock_exists.return_value = False
from pkgmgr.core.git.commands.clone import GitCloneError
mock_git_clone.side_effect = GitCloneError("ssh failed", cwd="/tmp/repos/user")
clone_repos(
self.selected,
self.base_dir,
self.all_repos,
preview=False,
no_verification=True,
clone_mode="ssh",
)
mock_git_clone.assert_called_once()
mock_input.assert_called_once()
# If fallback is declined, verification should NOT run (repo was not cloned)
mock_verify.assert_not_called()
@patch("builtins.input", return_value="n")
@patch("pkgmgr.actions.repository.clone.verify_repository")
@patch("pkgmgr.actions.repository.clone.git_clone")
@patch("pkgmgr.actions.repository.clone.os.makedirs")
@patch("pkgmgr.actions.repository.clone.os.path.exists")
@patch("pkgmgr.actions.repository.clone.get_repo_dir")
@patch("pkgmgr.actions.repository.clone.get_repo_identifier")
def test_verification_failure_prompts_and_skips_when_user_declines(
self,
mock_get_repo_identifier,
mock_get_repo_dir,
mock_exists,
mock_makedirs,
mock_git_clone,
mock_verify,
mock_input,
) -> None:
mock_get_repo_identifier.return_value = "github.com/user/repo"
mock_get_repo_dir.return_value = "/tmp/repos/user/repo"
mock_exists.return_value = False
# Clone succeeds
mock_git_clone.return_value = None
# Verification fails, and user answers "n" to proceed anyway
mock_verify.return_value = (False, ["bad signature"], "hash", "key")
clone_repos(
self.selected,
self.base_dir,
self.all_repos,
preview=False,
no_verification=False,
clone_mode="https",
)
mock_git_clone.assert_called_once()
mock_verify.assert_called_once()
mock_input.assert_called_once()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,6 +1,6 @@
import io import io
import unittest import unittest
from unittest.mock import patch, MagicMock from unittest.mock import patch
from pkgmgr.actions.repository.pull import pull_with_verification from pkgmgr.actions.repository.pull import pull_with_verification
@@ -12,14 +12,23 @@ class TestPullWithVerification(unittest.TestCase):
These tests verify: These tests verify:
- Preview mode behaviour - Preview mode behaviour
- Verification logic (prompting, bypassing, skipping) - Verification logic (prompting, bypassing, skipping)
- subprocess.run invocation - pull_args invocation (instead of subprocess.run)
- Repository directory existence checks - Repository directory existence checks
- Handling of extra git pull arguments - Handling of extra git pull arguments
""" """
def _setup_mocks(self, mock_exists, mock_get_repo_id, mock_get_repo_dir, def _setup_mocks(
mock_verify, exists=True, verified_ok=True, self,
errors=None, verified_info=True): mock_exists,
mock_get_repo_id,
mock_get_repo_dir,
mock_verify,
*,
exists: bool = True,
verified_ok: bool = True,
errors=None,
verified_info: bool = True,
):
"""Helper to configure repetitive mock behavior.""" """Helper to configure repetitive mock behavior."""
repo = { repo = {
"name": "pkgmgr", "name": "pkgmgr",
@@ -37,7 +46,7 @@ class TestPullWithVerification(unittest.TestCase):
return repo return repo
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
@patch("pkgmgr.actions.repository.pull.subprocess.run") @patch("pkgmgr.actions.repository.pull.pull_args")
@patch("pkgmgr.actions.repository.pull.verify_repository") @patch("pkgmgr.actions.repository.pull.verify_repository")
@patch("pkgmgr.actions.repository.pull.get_repo_dir") @patch("pkgmgr.actions.repository.pull.get_repo_dir")
@patch("pkgmgr.actions.repository.pull.get_repo_identifier") @patch("pkgmgr.actions.repository.pull.get_repo_identifier")
@@ -50,11 +59,11 @@ class TestPullWithVerification(unittest.TestCase):
mock_get_repo_id, mock_get_repo_id,
mock_get_repo_dir, mock_get_repo_dir,
mock_verify, mock_verify,
mock_subprocess, mock_pull_args,
): ):
""" """
Preview mode must NEVER request user input and must NEVER execute git. Preview mode must NEVER request user input and must still call pull_args
It must only print the preview command. in preview mode (which prints the preview command via core.git.run()).
""" """
repo = self._setup_mocks( repo = self._setup_mocks(
mock_exists, mock_exists,
@@ -78,17 +87,15 @@ class TestPullWithVerification(unittest.TestCase):
preview=True, preview=True,
) )
output = buf.getvalue() mock_input.assert_not_called()
self.assertIn( mock_pull_args.assert_called_once_with(
"[Preview] In '/fake/base/pkgmgr': git pull --ff-only", ["--ff-only"],
output, cwd="/fake/base/pkgmgr",
preview=True,
) )
mock_input.assert_not_called()
mock_subprocess.assert_not_called()
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
@patch("pkgmgr.actions.repository.pull.subprocess.run") @patch("pkgmgr.actions.repository.pull.pull_args")
@patch("pkgmgr.actions.repository.pull.verify_repository") @patch("pkgmgr.actions.repository.pull.verify_repository")
@patch("pkgmgr.actions.repository.pull.get_repo_dir") @patch("pkgmgr.actions.repository.pull.get_repo_dir")
@patch("pkgmgr.actions.repository.pull.get_repo_identifier") @patch("pkgmgr.actions.repository.pull.get_repo_identifier")
@@ -101,7 +108,7 @@ class TestPullWithVerification(unittest.TestCase):
mock_get_repo_id, mock_get_repo_id,
mock_get_repo_dir, mock_get_repo_dir,
mock_verify, mock_verify,
mock_subprocess, mock_pull_args,
): ):
""" """
If verification fails and preview=False, the user is prompted. If verification fails and preview=False, the user is prompted.
@@ -118,8 +125,6 @@ class TestPullWithVerification(unittest.TestCase):
mock_input.return_value = "n" mock_input.return_value = "n"
buf = io.StringIO()
with patch("sys.stdout", new=buf):
pull_with_verification( pull_with_verification(
selected_repos=[repo], selected_repos=[repo],
repositories_base_dir="/fake/base", repositories_base_dir="/fake/base",
@@ -130,10 +135,10 @@ class TestPullWithVerification(unittest.TestCase):
) )
mock_input.assert_called_once() mock_input.assert_called_once()
mock_subprocess.assert_not_called() mock_pull_args.assert_not_called()
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
@patch("pkgmgr.actions.repository.pull.subprocess.run") @patch("pkgmgr.actions.repository.pull.pull_args")
@patch("pkgmgr.actions.repository.pull.verify_repository") @patch("pkgmgr.actions.repository.pull.verify_repository")
@patch("pkgmgr.actions.repository.pull.get_repo_dir") @patch("pkgmgr.actions.repository.pull.get_repo_dir")
@patch("pkgmgr.actions.repository.pull.get_repo_identifier") @patch("pkgmgr.actions.repository.pull.get_repo_identifier")
@@ -146,11 +151,11 @@ class TestPullWithVerification(unittest.TestCase):
mock_get_repo_id, mock_get_repo_id,
mock_get_repo_dir, mock_get_repo_dir,
mock_verify, mock_verify,
mock_subprocess, mock_pull_args,
): ):
""" """
If verification fails and the user accepts ('y'), If verification fails and the user accepts ('y'),
then the git pull should be executed. then the git pull should be executed via pull_args.
""" """
repo = self._setup_mocks( repo = self._setup_mocks(
mock_exists, mock_exists,
@@ -162,7 +167,6 @@ class TestPullWithVerification(unittest.TestCase):
) )
mock_input.return_value = "y" mock_input.return_value = "y"
mock_subprocess.return_value = MagicMock(returncode=0)
pull_with_verification( pull_with_verification(
selected_repos=[repo], selected_repos=[repo],
@@ -173,11 +177,15 @@ class TestPullWithVerification(unittest.TestCase):
preview=False, preview=False,
) )
mock_subprocess.assert_called_once()
mock_input.assert_called_once() mock_input.assert_called_once()
mock_pull_args.assert_called_once_with(
[],
cwd="/fake/base/pkgmgr",
preview=False,
)
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
@patch("pkgmgr.actions.repository.pull.subprocess.run") @patch("pkgmgr.actions.repository.pull.pull_args")
@patch("pkgmgr.actions.repository.pull.verify_repository") @patch("pkgmgr.actions.repository.pull.verify_repository")
@patch("pkgmgr.actions.repository.pull.get_repo_dir") @patch("pkgmgr.actions.repository.pull.get_repo_dir")
@patch("pkgmgr.actions.repository.pull.get_repo_identifier") @patch("pkgmgr.actions.repository.pull.get_repo_identifier")
@@ -190,7 +198,7 @@ class TestPullWithVerification(unittest.TestCase):
mock_get_repo_id, mock_get_repo_id,
mock_get_repo_dir, mock_get_repo_dir,
mock_verify, mock_verify,
mock_subprocess, mock_pull_args,
): ):
""" """
If verification is successful, the user should NOT be prompted, If verification is successful, the user should NOT be prompted,
@@ -204,8 +212,6 @@ class TestPullWithVerification(unittest.TestCase):
verified_ok=True, verified_ok=True,
) )
mock_subprocess.return_value = MagicMock(returncode=0)
pull_with_verification( pull_with_verification(
selected_repos=[repo], selected_repos=[repo],
repositories_base_dir="/fake/base", repositories_base_dir="/fake/base",
@@ -216,12 +222,14 @@ class TestPullWithVerification(unittest.TestCase):
) )
mock_input.assert_not_called() mock_input.assert_not_called()
mock_subprocess.assert_called_once() mock_pull_args.assert_called_once_with(
cmd = mock_subprocess.call_args[0][0] ["--rebase"],
self.assertIn("git pull --rebase", cmd) cwd="/fake/base/pkgmgr",
preview=False,
)
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
@patch("pkgmgr.actions.repository.pull.subprocess.run") @patch("pkgmgr.actions.repository.pull.pull_args")
@patch("pkgmgr.actions.repository.pull.verify_repository") @patch("pkgmgr.actions.repository.pull.verify_repository")
@patch("pkgmgr.actions.repository.pull.get_repo_dir") @patch("pkgmgr.actions.repository.pull.get_repo_dir")
@patch("pkgmgr.actions.repository.pull.get_repo_identifier") @patch("pkgmgr.actions.repository.pull.get_repo_identifier")
@@ -234,11 +242,11 @@ class TestPullWithVerification(unittest.TestCase):
mock_get_repo_id, mock_get_repo_id,
mock_get_repo_dir, mock_get_repo_dir,
mock_verify, mock_verify,
mock_subprocess, mock_pull_args,
): ):
""" """
If the repository directory does not exist, the repo must be skipped If the repository directory does not exist, the repo must be skipped
silently and no git command executed. and no git command executed.
""" """
repo = self._setup_mocks( repo = self._setup_mocks(
mock_exists, mock_exists,
@@ -263,10 +271,10 @@ class TestPullWithVerification(unittest.TestCase):
self.assertIn("not found", output) self.assertIn("not found", output)
mock_input.assert_not_called() mock_input.assert_not_called()
mock_subprocess.assert_not_called() mock_pull_args.assert_not_called()
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
@patch("pkgmgr.actions.repository.pull.subprocess.run") @patch("pkgmgr.actions.repository.pull.pull_args")
@patch("pkgmgr.actions.repository.pull.verify_repository") @patch("pkgmgr.actions.repository.pull.verify_repository")
@patch("pkgmgr.actions.repository.pull.get_repo_dir") @patch("pkgmgr.actions.repository.pull.get_repo_dir")
@patch("pkgmgr.actions.repository.pull.get_repo_identifier") @patch("pkgmgr.actions.repository.pull.get_repo_identifier")
@@ -279,7 +287,7 @@ class TestPullWithVerification(unittest.TestCase):
mock_get_repo_id, mock_get_repo_id,
mock_get_repo_dir, mock_get_repo_dir,
mock_verify, mock_verify,
mock_subprocess, mock_pull_args,
): ):
""" """
If no_verification=True, verification failures must NOT prompt. If no_verification=True, verification failures must NOT prompt.
@@ -294,8 +302,6 @@ class TestPullWithVerification(unittest.TestCase):
errors=["invalid"], errors=["invalid"],
) )
mock_subprocess.return_value = MagicMock(returncode=0)
pull_with_verification( pull_with_verification(
selected_repos=[repo], selected_repos=[repo],
repositories_base_dir="/fake/base", repositories_base_dir="/fake/base",
@@ -306,4 +312,8 @@ class TestPullWithVerification(unittest.TestCase):
) )
mock_input.assert_not_called() mock_input.assert_not_called()
mock_subprocess.assert_called_once() mock_pull_args.assert_called_once_with(
[],
cwd="/fake/base/pkgmgr",
preview=False,
)

View File

@@ -2,9 +2,9 @@ from __future__ import annotations
import unittest import unittest
from pkgmgr.actions.repository.create import ( from pkgmgr.actions.repository.create.model import RepoParts
RepoParts, from pkgmgr.actions.repository.create.parser import (
_parse_identifier, parse_identifier,
_parse_git_url, _parse_git_url,
_strip_git_suffix, _strip_git_suffix,
_split_host_port, _split_host_port,
@@ -22,7 +22,7 @@ class TestRepositoryCreateParsing(unittest.TestCase):
self.assertEqual(_split_host_port("example.com:"), ("example.com", None)) self.assertEqual(_split_host_port("example.com:"), ("example.com", None))
def test_parse_identifier_plain(self) -> None: def test_parse_identifier_plain(self) -> None:
parts = _parse_identifier("github.com/owner/repo") parts = parse_identifier("github.com/owner/repo")
self.assertIsInstance(parts, RepoParts) self.assertIsInstance(parts, RepoParts)
self.assertEqual(parts.host, "github.com") self.assertEqual(parts.host, "github.com")
self.assertEqual(parts.port, None) self.assertEqual(parts.port, None)
@@ -30,7 +30,7 @@ class TestRepositoryCreateParsing(unittest.TestCase):
self.assertEqual(parts.name, "repo") self.assertEqual(parts.name, "repo")
def test_parse_identifier_with_port(self) -> None: def test_parse_identifier_with_port(self) -> None:
parts = _parse_identifier("gitea.example.com:2222/org/repo") parts = parse_identifier("gitea.example.com:2222/org/repo")
self.assertEqual(parts.host, "gitea.example.com") self.assertEqual(parts.host, "gitea.example.com")
self.assertEqual(parts.port, "2222") self.assertEqual(parts.port, "2222")
self.assertEqual(parts.owner, "org") self.assertEqual(parts.owner, "org")

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.actions.repository.create.templates import TemplateRenderer
class TestTemplateRendererPreview(unittest.TestCase):
def test_render_preview_does_not_write(self) -> None:
# Ensure TemplateRenderer does not try to resolve real repo root.
with (
patch(
"pkgmgr.actions.repository.create.templates.TemplateRenderer._resolve_templates_dir",
return_value="/tpl",
),
patch(
"pkgmgr.actions.repository.create.templates.os.walk",
return_value=[("/tpl", [], ["README.md.j2"])],
),
patch(
"pkgmgr.actions.repository.create.templates.os.path.relpath",
return_value="README.md.j2",
),
patch("pkgmgr.actions.repository.create.templates.os.makedirs") as mk,
patch("pkgmgr.actions.repository.create.templates.open", create=True) as op,
patch("pkgmgr.actions.repository.create.templates.Environment") as env_cls,
):
renderer = TemplateRenderer()
renderer.render(
repo_dir="/repo",
context={"repository": "x"},
preview=True,
)
mk.assert_not_called()
op.assert_not_called()
env_cls.assert_not_called()
if __name__ == "__main__":
unittest.main()

View File

@@ -1,35 +0,0 @@
from __future__ import annotations
import unittest
from unittest.mock import patch
from pkgmgr.actions.repository.scaffold import render_default_templates
class TestScaffoldRenderPreview(unittest.TestCase):
def test_render_preview_does_not_write(self) -> None:
with (
patch("pkgmgr.actions.repository.scaffold._templates_dir", return_value="/tpl"),
patch("pkgmgr.actions.repository.scaffold.os.path.isdir", return_value=True),
patch("pkgmgr.actions.repository.scaffold.os.walk", return_value=[("/tpl", [], ["README.md.j2"])]),
patch("pkgmgr.actions.repository.scaffold.os.path.relpath", return_value="README.md.j2"),
patch("pkgmgr.actions.repository.scaffold.os.makedirs") as mk,
patch("pkgmgr.actions.repository.scaffold.open", create=True) as op,
patch("pkgmgr.actions.repository.scaffold.Environment") as env_cls,
):
env = env_cls.return_value
env.get_template.return_value.render.return_value = "X"
render_default_templates(
"/repo",
context={"repository": "x"},
preview=True,
)
mk.assert_not_called()
op.assert_not_called()
env.get_template.assert_not_called()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,69 @@
import unittest
from unittest.mock import MagicMock, patch
from pkgmgr.core.git.errors import GitError
from pkgmgr.core.git.run import run
class TestGitRun(unittest.TestCase):
def test_preview_mode_prints_and_does_not_execute(self) -> None:
with patch("pkgmgr.core.git.run.subprocess.run") as mock_run, patch(
"builtins.print"
) as mock_print:
out = run(["status"], cwd="/tmp/repo", preview=True)
self.assertEqual(out, "")
mock_run.assert_not_called()
mock_print.assert_called_once()
printed = mock_print.call_args[0][0]
self.assertIn("[PREVIEW] Would run in '/tmp/repo': git status", printed)
def test_success_returns_stripped_stdout(self) -> None:
completed = MagicMock()
completed.stdout = " hello world \n"
completed.stderr = ""
completed.returncode = 0
with patch("pkgmgr.core.git.run.subprocess.run", return_value=completed) as mock_run:
out = run(["rev-parse", "HEAD"], cwd="/repo", preview=False)
self.assertEqual(out, "hello world")
mock_run.assert_called_once()
args, kwargs = mock_run.call_args
self.assertEqual(args[0], ["git", "rev-parse", "HEAD"])
self.assertEqual(kwargs["cwd"], "/repo")
self.assertTrue(kwargs["check"])
self.assertTrue(kwargs["text"])
# ensure pipes are used (matches implementation intent)
self.assertIsNotNone(kwargs["stdout"])
self.assertIsNotNone(kwargs["stderr"])
def test_failure_raises_giterror_with_details(self) -> None:
# Build a CalledProcessError with stdout/stderr populated
import subprocess as sp
exc = sp.CalledProcessError(
returncode=128,
cmd=["git", "status"],
output="OUT!",
stderr="ERR!",
)
# Your implementation reads exc.stdout, but CalledProcessError stores it as .output
# in some cases. Ensure .stdout exists for deterministic behavior.
exc.stdout = "OUT!"
exc.stderr = "ERR!"
with patch("pkgmgr.core.git.run.subprocess.run", side_effect=exc):
with self.assertRaises(GitError) as ctx:
run(["status"], cwd="/bad/repo", preview=False)
msg = str(ctx.exception)
self.assertIn("Git command failed in '/bad/repo': git status", msg)
self.assertIn("Exit code: 128", msg)
self.assertIn("STDOUT:\nOUT!", msg)
self.assertIn("STDERR:\nERR!", msg)
if __name__ == "__main__":
unittest.main()