from __future__ import annotations import os import re import shutil import subprocess import tempfile from pathlib import Path from typing import Optional, List from urllib.parse import urlparse from .config import SKIP_DIRS, MANIFEST_NAMES def _safe_run(cmd: List[str], cwd: Optional[str | Path] = None, timeout: int = 120) -> subprocess.CompletedProcess: env = dict(os.environ) env["GIT_TERMINAL_PROMPT"] = "0" # do not hang on private repos return subprocess.run(cmd, cwd=cwd, env=env, capture_output=True, text=True, timeout=timeout) def normalize_git_url(url: str) -> str: if not url: return url url = url.strip() m = re.match(r"git@([^:]+):(.+)$", url) if m: host, path = m.group(1), m.group(2) if path.endswith(".git"): path = path[:-4] return f"https://{host}/{path}".rstrip("/") parsed = urlparse(url) if parsed.scheme in ("http", "https"): clean_path = parsed.path for marker in ("/tree/", "/blob/"): idx = clean_path.find(marker) if idx != -1: clean_path = clean_path[:idx] if clean_path.endswith(".git"): clean_path = clean_path[:-4] clean_path = clean_path.rstrip("/") return f"{parsed.scheme}://{parsed.netloc}{clean_path}" return url[:-4] if url.endswith(".git") else url def checkout_repo(url: str, revision: Optional[str]) -> Path: url = normalize_git_url(url) tmpdir = Path(tempfile.mkdtemp(prefix="cvexplorer_repo_")) def must(p: subprocess.CompletedProcess, cmd: List[str]): if p.returncode != 0: raise RuntimeError(f"Command failed: {' '.join(cmd)}\n{p.stderr.strip()}") if revision: p = _safe_run(["git", "init"], cwd=tmpdir, timeout=30) must(p, ["git", "init"]) p = _safe_run(["git", "remote", "add", "origin", url], cwd=tmpdir, timeout=30) must(p, ["git", "remote", "add", "origin", url]) p = _safe_run(["git", "fetch", "--depth", "1", "origin", revision], cwd=tmpdir, timeout=120) must(p, ["git", "fetch", "--depth", "1", "origin", revision]) p = _safe_run(["git", "checkout", "FETCH_HEAD"], cwd=tmpdir, timeout=30) must(p, ["git", "checkout", "FETCH_HEAD"]) else: shutil.rmtree(tmpdir, ignore_errors=True) p = _safe_run(["git", "clone", "--depth", "1", url, str(tmpdir)], timeout=180) must(p, ["git", "clone", "--depth", "1", url, str(tmpdir)]) return tmpdir def find_manifests(repo_dir: Path, max_files: int = 25) -> List[Path]: found: List[Path] = [] for root, dirs, files in os.walk(repo_dir): dirs[:] = [d for d in dirs if d not in SKIP_DIRS] for fname in files: if fname in MANIFEST_NAMES or fname.endswith(".csproj") or fname.endswith(".fsproj"): found.append(Path(root) / fname) if len(found) >= max_files: break if len(found) >= max_files: break found.sort(key=lambda p: len(p.relative_to(repo_dir).parts)) return found