gpt-5.2 ChatGPT: integrate gh-based credential resolution with full integration test
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
Some checks failed
Mark stable commit / test-unit (push) Has been cancelled
Mark stable commit / test-integration (push) Has been cancelled
Mark stable commit / test-env-virtual (push) Has been cancelled
Mark stable commit / test-env-nix (push) Has been cancelled
Mark stable commit / test-e2e (push) Has been cancelled
Mark stable commit / test-virgin-user (push) Has been cancelled
Mark stable commit / test-virgin-root (push) Has been cancelled
Mark stable commit / lint-shell (push) Has been cancelled
Mark stable commit / lint-python (push) Has been cancelled
Mark stable commit / mark-stable (push) Has been cancelled
- Add GhTokenProvider to read GitHub tokens via `gh auth token` - Extend TokenResolver policy: ENV → gh → keyring (validate) → prompt (overwrite) - Introduce provider-specific token validation for GitHub - Ensure invalid keyring tokens trigger interactive re-prompt and overwrite - Add end-to-end integration test covering gh → keyring → prompt flow - Clean up credentials package exports and documentation https://chatgpt.com/share/69418c81-6748-800f-8fec-616684746e3c
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
# src/pkgmgr/core/credentials/__init__.py
|
||||
"""Credential resolution for provider APIs."""
|
||||
|
||||
from .resolver import ResolutionOptions, TokenResolver
|
||||
|
||||
@@ -3,9 +3,11 @@
|
||||
from .env import EnvTokenProvider
|
||||
from .keyring import KeyringTokenProvider
|
||||
from .prompt import PromptTokenProvider
|
||||
from .gh import GhTokenProvider
|
||||
|
||||
__all__ = [
|
||||
"EnvTokenProvider",
|
||||
"KeyringTokenProvider",
|
||||
"PromptTokenProvider",
|
||||
"GhTokenProvider",
|
||||
]
|
||||
|
||||
43
src/pkgmgr/core/credentials/providers/gh.py
Normal file
43
src/pkgmgr/core/credentials/providers/gh.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from ..types import TokenRequest, TokenResult
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GhTokenProvider:
|
||||
"""
|
||||
Resolve a GitHub token via GitHub CLI (`gh auth token`).
|
||||
|
||||
This does NOT persist anything; it only reads what `gh` already knows.
|
||||
"""
|
||||
source_name: str = "gh"
|
||||
|
||||
def get(self, request: TokenRequest) -> Optional[TokenResult]:
|
||||
# Only meaningful for GitHub-like providers
|
||||
kind = (request.provider_kind or "").strip().lower()
|
||||
if kind not in ("github", "github.com"):
|
||||
return None
|
||||
|
||||
if not shutil.which("gh"):
|
||||
return None
|
||||
|
||||
host = (request.host or "").strip() or "github.com"
|
||||
|
||||
try:
|
||||
out = subprocess.check_output(
|
||||
["gh", "auth", "token", "--hostname", host],
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
).strip()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
if not out:
|
||||
return None
|
||||
|
||||
return TokenResult(token=out, source=self.source_name)
|
||||
@@ -6,9 +6,11 @@ from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from .providers.env import EnvTokenProvider
|
||||
from .providers.gh import GhTokenProvider
|
||||
from .providers.keyring import KeyringTokenProvider
|
||||
from .providers.prompt import PromptTokenProvider
|
||||
from .types import KeyringUnavailableError, NoCredentialsError, TokenRequest, TokenResult
|
||||
from .validate import validate_token
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -21,10 +23,24 @@ class ResolutionOptions:
|
||||
|
||||
|
||||
class TokenResolver:
|
||||
"""Resolve tokens from multiple sources (ENV -> Keyring -> Prompt)."""
|
||||
"""
|
||||
Resolve tokens for provider APIs using the following policy:
|
||||
|
||||
0) ENV (explicit user intent) -> return as-is (do NOT persist)
|
||||
1) GitHub CLI (gh) -> if available and token validates, return
|
||||
2) Keyring -> if token validates, return; if invalid and
|
||||
interactive prompting is allowed, prompt and
|
||||
OVERWRITE the keyring entry
|
||||
3) Prompt -> prompt and (optionally) store in keyring
|
||||
|
||||
Notes:
|
||||
- Keyring requires python-keyring.
|
||||
- Token validation is provider-specific (currently GitHub cloud).
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._env = EnvTokenProvider()
|
||||
self._gh = GhTokenProvider()
|
||||
self._keyring = KeyringTokenProvider()
|
||||
self._prompt = PromptTokenProvider()
|
||||
self._warned_keyring: bool = False
|
||||
@@ -48,6 +64,33 @@ class TokenResolver:
|
||||
print(" sudo dnf install python3-keyring", file=sys.stderr)
|
||||
print("", file=sys.stderr)
|
||||
|
||||
def _prompt_and_maybe_store(
|
||||
self,
|
||||
request: TokenRequest,
|
||||
opts: ResolutionOptions,
|
||||
) -> Optional[TokenResult]:
|
||||
"""
|
||||
Prompt for a token and optionally store it in keyring.
|
||||
If keyring is unavailable, still return the token for this run.
|
||||
"""
|
||||
if not (opts.interactive and opts.allow_prompt):
|
||||
return None
|
||||
|
||||
prompt_res = self._prompt.get(request)
|
||||
if not prompt_res:
|
||||
return None
|
||||
|
||||
if opts.save_prompt_token_to_keyring:
|
||||
try:
|
||||
self._keyring.set(request, prompt_res.token) # overwrite is fine
|
||||
except KeyringUnavailableError as exc:
|
||||
self._warn_keyring_unavailable(exc)
|
||||
except Exception:
|
||||
# If keyring cannot store, still use token for this run.
|
||||
pass
|
||||
|
||||
return prompt_res
|
||||
|
||||
def get_token(
|
||||
self,
|
||||
provider_kind: str,
|
||||
@@ -58,16 +101,29 @@ class TokenResolver:
|
||||
opts = options or ResolutionOptions()
|
||||
request = TokenRequest(provider_kind=provider_kind, host=host, owner=owner)
|
||||
|
||||
# 1) ENV
|
||||
# 0) ENV (highest priority; explicit user intent)
|
||||
env_res = self._env.get(request)
|
||||
if env_res:
|
||||
# Do NOT validate or persist env tokens automatically.
|
||||
return env_res
|
||||
|
||||
# 2) Keyring
|
||||
# 1) GitHub CLI (gh) (auto-read; validate)
|
||||
gh_res = self._gh.get(request)
|
||||
if gh_res and validate_token(request.provider_kind, request.host, gh_res.token):
|
||||
return gh_res
|
||||
|
||||
# 2) Keyring (validate; if invalid -> prompt + overwrite)
|
||||
try:
|
||||
kr_res = self._keyring.get(request)
|
||||
if kr_res:
|
||||
return kr_res
|
||||
if validate_token(request.provider_kind, request.host, kr_res.token):
|
||||
return kr_res
|
||||
|
||||
# Token exists but seems invalid -> re-prompt and overwrite keyring.
|
||||
renewed = self._prompt_and_maybe_store(request, opts)
|
||||
if renewed:
|
||||
return renewed
|
||||
|
||||
except KeyringUnavailableError as exc:
|
||||
# Show a helpful warning once, then continue (prompt fallback).
|
||||
self._warn_keyring_unavailable(exc)
|
||||
@@ -76,21 +132,12 @@ class TokenResolver:
|
||||
pass
|
||||
|
||||
# 3) Prompt (optional)
|
||||
if opts.interactive and opts.allow_prompt:
|
||||
prompt_res = self._prompt.get(request)
|
||||
if prompt_res:
|
||||
if opts.save_prompt_token_to_keyring:
|
||||
try:
|
||||
self._keyring.set(request, prompt_res.token)
|
||||
except KeyringUnavailableError as exc:
|
||||
self._warn_keyring_unavailable(exc)
|
||||
except Exception:
|
||||
# If keyring cannot store, still use token for this run.
|
||||
pass
|
||||
return prompt_res
|
||||
prompt_res = self._prompt_and_maybe_store(request, opts)
|
||||
if prompt_res:
|
||||
return prompt_res
|
||||
|
||||
raise NoCredentialsError(
|
||||
f"No token available for {provider_kind}@{host}"
|
||||
+ (f" (owner: {owner})" if owner else "")
|
||||
+ ". Provide it via environment variable or keyring."
|
||||
+ ". Provide it via environment variable, keyring, or gh auth."
|
||||
)
|
||||
|
||||
@@ -44,6 +44,7 @@ def env_var_candidates(provider_kind: str, host: str, owner: Optional[str]) -> l
|
||||
candidates.append(f"PKGMGR_{kind}_TOKEN")
|
||||
candidates.append(f"PKGMGR_TOKEN_{kind}")
|
||||
candidates.append("PKGMGR_TOKEN")
|
||||
|
||||
return candidates
|
||||
|
||||
|
||||
|
||||
41
src/pkgmgr/core/credentials/validate.py
Normal file
41
src/pkgmgr/core/credentials/validate.py
Normal file
@@ -0,0 +1,41 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
import urllib.request
|
||||
import json
|
||||
|
||||
|
||||
def validate_token(provider_kind: str, host: str, token: str) -> bool:
|
||||
"""
|
||||
Return True if token appears valid for the provider.
|
||||
Currently implemented for GitHub only.
|
||||
"""
|
||||
kind = (provider_kind or "").strip().lower()
|
||||
host = (host or "").strip() or "github.com"
|
||||
token = (token or "").strip()
|
||||
if not token:
|
||||
return False
|
||||
|
||||
if kind in ("github", "github.com") and host.lower() == "github.com":
|
||||
req = urllib.request.Request(
|
||||
"https://api.github.com/user",
|
||||
headers={
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Accept": "application/vnd.github+json",
|
||||
"User-Agent": "pkgmgr",
|
||||
},
|
||||
method="GET",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
if resp.status != 200:
|
||||
return False
|
||||
# Optional: parse to ensure body is JSON
|
||||
_ = json.loads(resp.read().decode("utf-8"))
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# Unknown provider: don't hard-fail validation (conservative default)
|
||||
# If you prefer strictness: return False here.
|
||||
return True
|
||||
106
tests/integration/test_token_resolver_flow.py
Normal file
106
tests/integration/test_token_resolver_flow.py
Normal file
@@ -0,0 +1,106 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from pkgmgr.core.credentials.resolver import TokenResolver
|
||||
from pkgmgr.core.credentials.types import TokenResult
|
||||
|
||||
|
||||
class TestTokenResolverIntegration(unittest.TestCase):
|
||||
def test_full_resolution_flow_with_invalid_gh_and_keyring_then_prompt(self) -> None:
|
||||
"""
|
||||
Full integration scenario:
|
||||
|
||||
- ENV provides nothing
|
||||
- GitHub CLI (gh) is available and returns a token, but it is INVALID
|
||||
- Keyring contains a token, but it is INVALID
|
||||
- Interactive prompt provides a NEW token
|
||||
- New token is ACCEPTED and OVERWRITES the keyring entry
|
||||
"""
|
||||
|
||||
resolver = TokenResolver()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 1) ENV: empty
|
||||
# ------------------------------------------------------------------
|
||||
with patch.dict("os.environ", {}, clear=True):
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 2) GH CLI is available
|
||||
# ------------------------------------------------------------------
|
||||
with patch(
|
||||
"pkgmgr.core.credentials.providers.gh.shutil.which",
|
||||
return_value="/usr/bin/gh",
|
||||
):
|
||||
with patch(
|
||||
"pkgmgr.core.credentials.providers.gh.subprocess.check_output",
|
||||
return_value="gh-invalid-token\n",
|
||||
):
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 3) Keyring returns an existing (invalid) token
|
||||
# ------------------------------------------------------------------
|
||||
with patch(
|
||||
"pkgmgr.core.credentials.providers.keyring._import_keyring"
|
||||
) as mock_import_keyring:
|
||||
|
||||
mock_keyring = mock_import_keyring.return_value
|
||||
mock_keyring.get_password.return_value = "keyring-invalid-token"
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 4) Prompt is allowed and returns a NEW token
|
||||
# ------------------------------------------------------------------
|
||||
with patch(
|
||||
"pkgmgr.core.credentials.providers.prompt.sys.stdin.isatty",
|
||||
return_value=True,
|
||||
):
|
||||
with patch(
|
||||
"pkgmgr.core.credentials.providers.prompt.getpass",
|
||||
return_value="new-valid-token",
|
||||
):
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 5) Validation logic:
|
||||
# - gh token invalid
|
||||
# - keyring token invalid
|
||||
# - prompt token is NOT validated (by design)
|
||||
# ------------------------------------------------------------------
|
||||
def validate_side_effect(
|
||||
provider_kind: str,
|
||||
host: str,
|
||||
token: str,
|
||||
) -> bool:
|
||||
return False # gh + keyring invalid
|
||||
|
||||
with patch(
|
||||
"pkgmgr.core.credentials.resolver.validate_token",
|
||||
side_effect=validate_side_effect,
|
||||
) as validate_mock:
|
||||
|
||||
result = resolver.get_token(
|
||||
provider_kind="github",
|
||||
host="github.com",
|
||||
)
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Assertions
|
||||
# ----------------------------------------------------------------------
|
||||
self.assertIsInstance(result, TokenResult)
|
||||
self.assertEqual(result.token, "new-valid-token")
|
||||
self.assertEqual(result.source, "prompt")
|
||||
|
||||
# validate_token was called ONLY for gh and keyring
|
||||
validated_tokens = [call.args[2] for call in validate_mock.call_args_list]
|
||||
self.assertIn("gh-invalid-token", validated_tokens)
|
||||
self.assertIn("keyring-invalid-token", validated_tokens)
|
||||
self.assertNotIn("new-valid-token", validated_tokens)
|
||||
|
||||
# Keyring must be overwritten with the new token
|
||||
mock_keyring.set_password.assert_called_once()
|
||||
service, username, stored_token = mock_keyring.set_password.call_args.args
|
||||
self.assertEqual(stored_token, "new-valid-token")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user