diff --git a/src/pkgmgr/core/credentials/__init__.py b/src/pkgmgr/core/credentials/__init__.py index 59b1592..2bf1bbc 100644 --- a/src/pkgmgr/core/credentials/__init__.py +++ b/src/pkgmgr/core/credentials/__init__.py @@ -1,4 +1,3 @@ -# src/pkgmgr/core/credentials/__init__.py """Credential resolution for provider APIs.""" from .resolver import ResolutionOptions, TokenResolver diff --git a/src/pkgmgr/core/credentials/providers/__init__.py b/src/pkgmgr/core/credentials/providers/__init__.py index baa8066..bdcf2bf 100644 --- a/src/pkgmgr/core/credentials/providers/__init__.py +++ b/src/pkgmgr/core/credentials/providers/__init__.py @@ -3,9 +3,11 @@ from .env import EnvTokenProvider from .keyring import KeyringTokenProvider from .prompt import PromptTokenProvider +from .gh import GhTokenProvider __all__ = [ "EnvTokenProvider", "KeyringTokenProvider", "PromptTokenProvider", + "GhTokenProvider", ] diff --git a/src/pkgmgr/core/credentials/providers/gh.py b/src/pkgmgr/core/credentials/providers/gh.py new file mode 100644 index 0000000..7140542 --- /dev/null +++ b/src/pkgmgr/core/credentials/providers/gh.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import shutil +import subprocess +from dataclasses import dataclass +from typing import Optional + +from ..types import TokenRequest, TokenResult + + +@dataclass(frozen=True) +class GhTokenProvider: + """ + Resolve a GitHub token via GitHub CLI (`gh auth token`). + + This does NOT persist anything; it only reads what `gh` already knows. + """ + source_name: str = "gh" + + def get(self, request: TokenRequest) -> Optional[TokenResult]: + # Only meaningful for GitHub-like providers + kind = (request.provider_kind or "").strip().lower() + if kind not in ("github", "github.com"): + return None + + if not shutil.which("gh"): + return None + + host = (request.host or "").strip() or "github.com" + + try: + out = subprocess.check_output( + ["gh", "auth", "token", "--hostname", host], + stderr=subprocess.STDOUT, + text=True, + ).strip() + except Exception: + return None + + if not out: + return None + + return TokenResult(token=out, source=self.source_name) diff --git a/src/pkgmgr/core/credentials/resolver.py b/src/pkgmgr/core/credentials/resolver.py index c0d633b..ee45b4a 100644 --- a/src/pkgmgr/core/credentials/resolver.py +++ b/src/pkgmgr/core/credentials/resolver.py @@ -6,9 +6,11 @@ from dataclasses import dataclass from typing import Optional from .providers.env import EnvTokenProvider +from .providers.gh import GhTokenProvider from .providers.keyring import KeyringTokenProvider from .providers.prompt import PromptTokenProvider from .types import KeyringUnavailableError, NoCredentialsError, TokenRequest, TokenResult +from .validate import validate_token @dataclass(frozen=True) @@ -21,10 +23,24 @@ class ResolutionOptions: class TokenResolver: - """Resolve tokens from multiple sources (ENV -> Keyring -> Prompt).""" + """ + Resolve tokens for provider APIs using the following policy: + + 0) ENV (explicit user intent) -> return as-is (do NOT persist) + 1) GitHub CLI (gh) -> if available and token validates, return + 2) Keyring -> if token validates, return; if invalid and + interactive prompting is allowed, prompt and + OVERWRITE the keyring entry + 3) Prompt -> prompt and (optionally) store in keyring + + Notes: + - Keyring requires python-keyring. + - Token validation is provider-specific (currently GitHub cloud). + """ def __init__(self) -> None: self._env = EnvTokenProvider() + self._gh = GhTokenProvider() self._keyring = KeyringTokenProvider() self._prompt = PromptTokenProvider() self._warned_keyring: bool = False @@ -48,6 +64,33 @@ class TokenResolver: print(" sudo dnf install python3-keyring", file=sys.stderr) print("", file=sys.stderr) + def _prompt_and_maybe_store( + self, + request: TokenRequest, + opts: ResolutionOptions, + ) -> Optional[TokenResult]: + """ + Prompt for a token and optionally store it in keyring. + If keyring is unavailable, still return the token for this run. + """ + if not (opts.interactive and opts.allow_prompt): + return None + + prompt_res = self._prompt.get(request) + if not prompt_res: + return None + + if opts.save_prompt_token_to_keyring: + try: + self._keyring.set(request, prompt_res.token) # overwrite is fine + except KeyringUnavailableError as exc: + self._warn_keyring_unavailable(exc) + except Exception: + # If keyring cannot store, still use token for this run. + pass + + return prompt_res + def get_token( self, provider_kind: str, @@ -58,16 +101,29 @@ class TokenResolver: opts = options or ResolutionOptions() request = TokenRequest(provider_kind=provider_kind, host=host, owner=owner) - # 1) ENV + # 0) ENV (highest priority; explicit user intent) env_res = self._env.get(request) if env_res: + # Do NOT validate or persist env tokens automatically. return env_res - # 2) Keyring + # 1) GitHub CLI (gh) (auto-read; validate) + gh_res = self._gh.get(request) + if gh_res and validate_token(request.provider_kind, request.host, gh_res.token): + return gh_res + + # 2) Keyring (validate; if invalid -> prompt + overwrite) try: kr_res = self._keyring.get(request) if kr_res: - return kr_res + if validate_token(request.provider_kind, request.host, kr_res.token): + return kr_res + + # Token exists but seems invalid -> re-prompt and overwrite keyring. + renewed = self._prompt_and_maybe_store(request, opts) + if renewed: + return renewed + except KeyringUnavailableError as exc: # Show a helpful warning once, then continue (prompt fallback). self._warn_keyring_unavailable(exc) @@ -76,21 +132,12 @@ class TokenResolver: pass # 3) Prompt (optional) - if opts.interactive and opts.allow_prompt: - prompt_res = self._prompt.get(request) - if prompt_res: - if opts.save_prompt_token_to_keyring: - try: - self._keyring.set(request, prompt_res.token) - except KeyringUnavailableError as exc: - self._warn_keyring_unavailable(exc) - except Exception: - # If keyring cannot store, still use token for this run. - pass - return prompt_res + prompt_res = self._prompt_and_maybe_store(request, opts) + if prompt_res: + return prompt_res raise NoCredentialsError( f"No token available for {provider_kind}@{host}" + (f" (owner: {owner})" if owner else "") - + ". Provide it via environment variable or keyring." + + ". Provide it via environment variable, keyring, or gh auth." ) diff --git a/src/pkgmgr/core/credentials/store_keys.py b/src/pkgmgr/core/credentials/store_keys.py index 64ae349..ff8bdff 100644 --- a/src/pkgmgr/core/credentials/store_keys.py +++ b/src/pkgmgr/core/credentials/store_keys.py @@ -44,6 +44,7 @@ def env_var_candidates(provider_kind: str, host: str, owner: Optional[str]) -> l candidates.append(f"PKGMGR_{kind}_TOKEN") candidates.append(f"PKGMGR_TOKEN_{kind}") candidates.append("PKGMGR_TOKEN") + return candidates diff --git a/src/pkgmgr/core/credentials/validate.py b/src/pkgmgr/core/credentials/validate.py new file mode 100644 index 0000000..8cd84d7 --- /dev/null +++ b/src/pkgmgr/core/credentials/validate.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from typing import Optional +import urllib.request +import json + + +def validate_token(provider_kind: str, host: str, token: str) -> bool: + """ + Return True if token appears valid for the provider. + Currently implemented for GitHub only. + """ + kind = (provider_kind or "").strip().lower() + host = (host or "").strip() or "github.com" + token = (token or "").strip() + if not token: + return False + + if kind in ("github", "github.com") and host.lower() == "github.com": + req = urllib.request.Request( + "https://api.github.com/user", + headers={ + "Authorization": f"Bearer {token}", + "Accept": "application/vnd.github+json", + "User-Agent": "pkgmgr", + }, + method="GET", + ) + try: + with urllib.request.urlopen(req, timeout=10) as resp: + if resp.status != 200: + return False + # Optional: parse to ensure body is JSON + _ = json.loads(resp.read().decode("utf-8")) + return True + except Exception: + return False + + # Unknown provider: don't hard-fail validation (conservative default) + # If you prefer strictness: return False here. + return True diff --git a/tests/integration/test_token_resolver_flow.py b/tests/integration/test_token_resolver_flow.py new file mode 100644 index 0000000..b49826b --- /dev/null +++ b/tests/integration/test_token_resolver_flow.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +import unittest +from unittest.mock import patch + +from pkgmgr.core.credentials.resolver import TokenResolver +from pkgmgr.core.credentials.types import TokenResult + + +class TestTokenResolverIntegration(unittest.TestCase): + def test_full_resolution_flow_with_invalid_gh_and_keyring_then_prompt(self) -> None: + """ + Full integration scenario: + + - ENV provides nothing + - GitHub CLI (gh) is available and returns a token, but it is INVALID + - Keyring contains a token, but it is INVALID + - Interactive prompt provides a NEW token + - New token is ACCEPTED and OVERWRITES the keyring entry + """ + + resolver = TokenResolver() + + # ------------------------------------------------------------------ + # 1) ENV: empty + # ------------------------------------------------------------------ + with patch.dict("os.environ", {}, clear=True): + + # ------------------------------------------------------------------ + # 2) GH CLI is available + # ------------------------------------------------------------------ + with patch( + "pkgmgr.core.credentials.providers.gh.shutil.which", + return_value="/usr/bin/gh", + ): + with patch( + "pkgmgr.core.credentials.providers.gh.subprocess.check_output", + return_value="gh-invalid-token\n", + ): + + # ------------------------------------------------------------------ + # 3) Keyring returns an existing (invalid) token + # ------------------------------------------------------------------ + with patch( + "pkgmgr.core.credentials.providers.keyring._import_keyring" + ) as mock_import_keyring: + + mock_keyring = mock_import_keyring.return_value + mock_keyring.get_password.return_value = "keyring-invalid-token" + + # ------------------------------------------------------------------ + # 4) Prompt is allowed and returns a NEW token + # ------------------------------------------------------------------ + with patch( + "pkgmgr.core.credentials.providers.prompt.sys.stdin.isatty", + return_value=True, + ): + with patch( + "pkgmgr.core.credentials.providers.prompt.getpass", + return_value="new-valid-token", + ): + + # ------------------------------------------------------------------ + # 5) Validation logic: + # - gh token invalid + # - keyring token invalid + # - prompt token is NOT validated (by design) + # ------------------------------------------------------------------ + def validate_side_effect( + provider_kind: str, + host: str, + token: str, + ) -> bool: + return False # gh + keyring invalid + + with patch( + "pkgmgr.core.credentials.resolver.validate_token", + side_effect=validate_side_effect, + ) as validate_mock: + + result = resolver.get_token( + provider_kind="github", + host="github.com", + ) + + # ---------------------------------------------------------------------- + # Assertions + # ---------------------------------------------------------------------- + self.assertIsInstance(result, TokenResult) + self.assertEqual(result.token, "new-valid-token") + self.assertEqual(result.source, "prompt") + + # validate_token was called ONLY for gh and keyring + validated_tokens = [call.args[2] for call in validate_mock.call_args_list] + self.assertIn("gh-invalid-token", validated_tokens) + self.assertIn("keyring-invalid-token", validated_tokens) + self.assertNotIn("new-valid-token", validated_tokens) + + # Keyring must be overwritten with the new token + mock_keyring.set_password.assert_called_once() + service, username, stored_token = mock_keyring.set_password.call_args.args + self.assertEqual(stored_token, "new-valid-token") + + +if __name__ == "__main__": + unittest.main()