Files
Chest a5714116ac +full refactor
+feat: configuration, progress bar, OSV
2026-01-18 13:54:14 +03:00

94 lines
3.0 KiB
Python

from __future__ import annotations
import os
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Optional, List
from urllib.parse import urlparse
from .config import SKIP_DIRS, MANIFEST_NAMES
def _safe_run(cmd: List[str], cwd: Optional[str | Path] = None, timeout: int = 120) -> subprocess.CompletedProcess:
env = dict(os.environ)
env["GIT_TERMINAL_PROMPT"] = "0" # do not hang on private repos
return subprocess.run(cmd, cwd=cwd, env=env, capture_output=True, text=True, timeout=timeout)
def normalize_git_url(url: str) -> str:
if not url:
return url
url = url.strip()
m = re.match(r"git@([^:]+):(.+)$", url)
if m:
host, path = m.group(1), m.group(2)
if path.endswith(".git"):
path = path[:-4]
return f"https://{host}/{path}".rstrip("/")
parsed = urlparse(url)
if parsed.scheme in ("http", "https"):
clean_path = parsed.path
for marker in ("/tree/", "/blob/"):
idx = clean_path.find(marker)
if idx != -1:
clean_path = clean_path[:idx]
if clean_path.endswith(".git"):
clean_path = clean_path[:-4]
clean_path = clean_path.rstrip("/")
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
return url[:-4] if url.endswith(".git") else url
def checkout_repo(url: str, revision: Optional[str]) -> Path:
url = normalize_git_url(url)
tmpdir = Path(tempfile.mkdtemp(prefix="cvexplorer_repo_"))
def must(p: subprocess.CompletedProcess, cmd: List[str]):
if p.returncode != 0:
raise RuntimeError(f"Command failed: {' '.join(cmd)}\n{p.stderr.strip()}")
if revision:
p = _safe_run(["git", "init"], cwd=tmpdir, timeout=30)
must(p, ["git", "init"])
p = _safe_run(["git", "remote", "add", "origin", url], cwd=tmpdir, timeout=30)
must(p, ["git", "remote", "add", "origin", url])
p = _safe_run(["git", "fetch", "--depth", "1", "origin", revision], cwd=tmpdir, timeout=120)
must(p, ["git", "fetch", "--depth", "1", "origin", revision])
p = _safe_run(["git", "checkout", "FETCH_HEAD"], cwd=tmpdir, timeout=30)
must(p, ["git", "checkout", "FETCH_HEAD"])
else:
shutil.rmtree(tmpdir, ignore_errors=True)
p = _safe_run(["git", "clone", "--depth", "1", url, str(tmpdir)], timeout=180)
must(p, ["git", "clone", "--depth", "1", url, str(tmpdir)])
return tmpdir
def find_manifests(repo_dir: Path, max_files: int = 25) -> List[Path]:
found: List[Path] = []
for root, dirs, files in os.walk(repo_dir):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for fname in files:
if fname in MANIFEST_NAMES or fname.endswith(".csproj") or fname.endswith(".fsproj"):
found.append(Path(root) / fname)
if len(found) >= max_files:
break
if len(found) >= max_files:
break
found.sort(key=lambda p: len(p.relative_to(repo_dir).parts))
return found