a5714116ac
+feat: configuration, progress bar, OSV
216 lines
6.3 KiB
Python
216 lines
6.3 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import shlex
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
from .config import MANIFEST_NAMES
|
|
from .container_io import exec_shell, read_text_from_container
|
|
from .git_repo import checkout_repo, find_manifests
|
|
from .parsers import parse_manifest_by_name
|
|
|
|
_REPO_CACHE: Dict[Tuple[str, Optional[str]], Dict[str, Any]] = {}
|
|
|
|
|
|
def dedupe_deps_exact(deps: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
seen = set()
|
|
out = []
|
|
for d in deps:
|
|
key = (d.get("ecosystem"), d.get("name"), d.get("spec"), d.get("scope"))
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
out.append(d)
|
|
return out
|
|
|
|
|
|
def _parse_semverish(v: Optional[str]) -> Tuple[int, int, int, int]:
|
|
if not v:
|
|
return (0, 0, 0, 0)
|
|
s = str(v).strip().lstrip("=v")
|
|
import re
|
|
m = re.match(r"^(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.(\d+))?", s)
|
|
if not m:
|
|
return (0, 0, 0, 0)
|
|
parts = [int(x) if x is not None else 0 for x in m.groups()]
|
|
return tuple(parts) # type: ignore
|
|
|
|
|
|
def _priority(eco: str, spec: Optional[str], scope: Optional[str]) -> int:
|
|
scope_l = (scope or "").lower()
|
|
spec_s = (spec or "").strip()
|
|
if scope_l == "lock":
|
|
return 100
|
|
if eco == "go":
|
|
if scope_l == "require":
|
|
return 90
|
|
if scope_l == "sum":
|
|
return 10
|
|
if eco == "npm":
|
|
if scope_l == "dependencies":
|
|
return 80
|
|
if scope_l in {"optionaldependencies", "peerdependencies"}:
|
|
return 60
|
|
if scope_l == "devdependencies":
|
|
return 15
|
|
if eco == "pypi":
|
|
if spec_s.startswith("=="):
|
|
return 75
|
|
if spec_s:
|
|
return 55
|
|
return 40
|
|
if eco in {"cargo", "nuget", "maven", "gradle"}:
|
|
return 60
|
|
|
|
return 50
|
|
|
|
|
|
def dedupe_effective(deps: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
best: Dict[Tuple[str, str], Dict[str, Any]] = {}
|
|
|
|
for d in deps:
|
|
eco = d.get("ecosystem")
|
|
name = d.get("name")
|
|
if not eco or not name:
|
|
continue
|
|
|
|
spec = d.get("spec")
|
|
scope = d.get("scope")
|
|
pr = _priority(str(eco), spec, scope)
|
|
key = (str(eco), str(name))
|
|
|
|
if key not in best:
|
|
dd = dict(d)
|
|
dd["_pr"] = pr
|
|
best[key] = dd
|
|
continue
|
|
|
|
cur = best[key]
|
|
if pr > cur["_pr"]:
|
|
dd = dict(d)
|
|
dd["_pr"] = pr
|
|
best[key] = dd
|
|
elif pr == cur["_pr"]:
|
|
if _parse_semverish(str(spec) if spec else "") > _parse_semverish(str(cur.get("spec") or "")):
|
|
dd = dict(d)
|
|
dd["_pr"] = pr
|
|
best[key] = dd
|
|
|
|
out = list(best.values())
|
|
for x in out:
|
|
x.pop("_pr", None)
|
|
return out
|
|
|
|
|
|
def find_manifest_paths_in_container(container, workdir: str, max_depth: int = 4, limit: int = 12) -> List[str]:
|
|
wd_q = shlex.quote(workdir or "/")
|
|
|
|
find_cmd = (
|
|
f"cd {wd_q} 2>/dev/null || cd /; "
|
|
f"find . -maxdepth {max_depth} -type f \\( "
|
|
# Python
|
|
"-name 'requirements*.txt' -o "
|
|
"-name 'Pipfile' -o "
|
|
"-name 'pyproject.toml' -o "
|
|
"-name 'poetry.lock' -o "
|
|
# Node
|
|
"-name 'package.json' -o "
|
|
"-name 'package-lock.json' -o "
|
|
"-name 'yarn.lock' -o "
|
|
"-name 'pnpm-lock.yaml' -o "
|
|
# Go
|
|
"-name 'go.mod' -o "
|
|
"-name 'go.sum' -o "
|
|
# Rust
|
|
"-name 'Cargo.toml' -o "
|
|
"-name 'Cargo.lock' -o "
|
|
# Java
|
|
"-name 'pom.xml' -o "
|
|
"-name 'build.gradle' -o "
|
|
"-name 'build.gradle.kts' -o "
|
|
# .NET
|
|
"-name '*.csproj' -o "
|
|
"-name '*.fsproj' -o "
|
|
"-name 'packages.config' "
|
|
"\\) 2>/dev/null | head -n " + str(limit) #Сколько ж ещё сюда добавлять........
|
|
)
|
|
|
|
exit_code, out = exec_shell(container, find_cmd)
|
|
if exit_code != 0:
|
|
return []
|
|
return [line.strip() for line in out.splitlines() if line.strip()]
|
|
|
|
|
|
def extract_deps_from_container(container, workdir: str, rel_paths: List[str]) -> Dict[str, Any]:
|
|
manifests: List[str] = []
|
|
deps: List[Dict[str, Any]] = []
|
|
errors: List[str] = []
|
|
|
|
for rel in rel_paths[:12]:
|
|
rel = rel.strip()
|
|
|
|
if rel.startswith("./"):
|
|
abs_path = workdir.rstrip("/") + "/" + rel[2:]
|
|
elif rel.startswith("/"):
|
|
abs_path = rel
|
|
else:
|
|
abs_path = workdir.rstrip("/") + "/" + rel
|
|
|
|
try:
|
|
text = read_text_from_container(container, abs_path)
|
|
manifests.append(abs_path)
|
|
deps.extend(parse_manifest_by_name(abs_path, text))
|
|
except Exception as e:
|
|
errors.append(f"{abs_path}: {type(e).__name__}: {e}")
|
|
|
|
deps = dedupe_deps_exact(deps)
|
|
return {"manifests": manifests, "dependencies": deps, "errors": errors}
|
|
|
|
|
|
def extract_deps_from_repo(source_url: str, revision: Optional[str], max_manifests: int = 12) -> Dict[str, Any]:
|
|
key = (str(source_url), revision)
|
|
if key in _REPO_CACHE:
|
|
return _REPO_CACHE[key]
|
|
|
|
repo_dir: Optional[Path] = None
|
|
try:
|
|
repo_dir = checkout_repo(source_url, revision)
|
|
found = find_manifests(repo_dir, max_files=25)
|
|
|
|
result: Dict[str, Any] = {
|
|
"source_url": source_url,
|
|
"revision": revision,
|
|
"manifests": [str(p.relative_to(repo_dir)) for p in found],
|
|
"dependencies": [],
|
|
"errors": [],
|
|
}
|
|
|
|
for p in found[:max_manifests]:
|
|
rel = str(p.relative_to(repo_dir))
|
|
try:
|
|
text = p.read_text(errors="ignore")
|
|
result["dependencies"].extend(parse_manifest_by_name(p.name, text))
|
|
except Exception as e:
|
|
result["errors"].append(f"{rel}: {type(e).__name__}: {e}")
|
|
|
|
result["dependencies"] = dedupe_deps_exact(result["dependencies"])
|
|
_REPO_CACHE[key] = result
|
|
return result
|
|
|
|
except Exception as e:
|
|
result = {
|
|
"source_url": source_url,
|
|
"revision": revision,
|
|
"manifests": [],
|
|
"dependencies": [],
|
|
"errors": [f"{type(e).__name__}: {e}"],
|
|
}
|
|
_REPO_CACHE[key] = result
|
|
return result
|
|
|
|
finally:
|
|
if repo_dir:
|
|
shutil.rmtree(repo_dir, ignore_errors=True)
|