from __future__ import annotations import os import shlex import shutil from pathlib import Path from typing import List, Dict, Any, Optional, Tuple from .config import MANIFEST_NAMES from .container_io import exec_shell, read_text_from_container from .git_repo import checkout_repo, find_manifests from .parsers import parse_manifest_by_name _REPO_CACHE: Dict[Tuple[str, Optional[str]], Dict[str, Any]] = {} def dedupe_deps_exact(deps: List[Dict[str, Any]]) -> List[Dict[str, Any]]: seen = set() out = [] for d in deps: key = (d.get("ecosystem"), d.get("name"), d.get("spec"), d.get("scope")) if key in seen: continue seen.add(key) out.append(d) return out def _parse_semverish(v: Optional[str]) -> Tuple[int, int, int, int]: if not v: return (0, 0, 0, 0) s = str(v).strip().lstrip("=v") import re m = re.match(r"^(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.(\d+))?", s) if not m: return (0, 0, 0, 0) parts = [int(x) if x is not None else 0 for x in m.groups()] return tuple(parts) # type: ignore def _priority(eco: str, spec: Optional[str], scope: Optional[str]) -> int: scope_l = (scope or "").lower() spec_s = (spec or "").strip() if scope_l == "lock": return 100 if eco == "go": if scope_l == "require": return 90 if scope_l == "sum": return 10 if eco == "npm": if scope_l == "dependencies": return 80 if scope_l in {"optionaldependencies", "peerdependencies"}: return 60 if scope_l == "devdependencies": return 15 if eco == "pypi": if spec_s.startswith("=="): return 75 if spec_s: return 55 return 40 if eco in {"cargo", "nuget", "maven", "gradle"}: return 60 return 50 def dedupe_effective(deps: List[Dict[str, Any]]) -> List[Dict[str, Any]]: best: Dict[Tuple[str, str], Dict[str, Any]] = {} for d in deps: eco = d.get("ecosystem") name = d.get("name") if not eco or not name: continue spec = d.get("spec") scope = d.get("scope") pr = _priority(str(eco), spec, scope) key = (str(eco), str(name)) if key not in best: dd = dict(d) dd["_pr"] = pr best[key] = dd continue cur = best[key] if pr > cur["_pr"]: dd = dict(d) dd["_pr"] = pr best[key] = dd elif pr == cur["_pr"]: if _parse_semverish(str(spec) if spec else "") > _parse_semverish(str(cur.get("spec") or "")): dd = dict(d) dd["_pr"] = pr best[key] = dd out = list(best.values()) for x in out: x.pop("_pr", None) return out def find_manifest_paths_in_container(container, workdir: str, max_depth: int = 4, limit: int = 12) -> List[str]: wd_q = shlex.quote(workdir or "/") find_cmd = ( f"cd {wd_q} 2>/dev/null || cd /; " f"find . -maxdepth {max_depth} -type f \\( " # Python "-name 'requirements*.txt' -o " "-name 'Pipfile' -o " "-name 'pyproject.toml' -o " "-name 'poetry.lock' -o " # Node "-name 'package.json' -o " "-name 'package-lock.json' -o " "-name 'yarn.lock' -o " "-name 'pnpm-lock.yaml' -o " # Go "-name 'go.mod' -o " "-name 'go.sum' -o " # Rust "-name 'Cargo.toml' -o " "-name 'Cargo.lock' -o " # Java "-name 'pom.xml' -o " "-name 'build.gradle' -o " "-name 'build.gradle.kts' -o " # .NET "-name '*.csproj' -o " "-name '*.fsproj' -o " "-name 'packages.config' " "\\) 2>/dev/null | head -n " + str(limit) #Сколько ж ещё сюда добавлять........ ) exit_code, out = exec_shell(container, find_cmd) if exit_code != 0: return [] return [line.strip() for line in out.splitlines() if line.strip()] def extract_deps_from_container(container, workdir: str, rel_paths: List[str]) -> Dict[str, Any]: manifests: List[str] = [] deps: List[Dict[str, Any]] = [] errors: List[str] = [] for rel in rel_paths[:12]: rel = rel.strip() if rel.startswith("./"): abs_path = workdir.rstrip("/") + "/" + rel[2:] elif rel.startswith("/"): abs_path = rel else: abs_path = workdir.rstrip("/") + "/" + rel try: text = read_text_from_container(container, abs_path) manifests.append(abs_path) deps.extend(parse_manifest_by_name(abs_path, text)) except Exception as e: errors.append(f"{abs_path}: {type(e).__name__}: {e}") deps = dedupe_deps_exact(deps) return {"manifests": manifests, "dependencies": deps, "errors": errors} def extract_deps_from_repo(source_url: str, revision: Optional[str], max_manifests: int = 12) -> Dict[str, Any]: key = (str(source_url), revision) if key in _REPO_CACHE: return _REPO_CACHE[key] repo_dir: Optional[Path] = None try: repo_dir = checkout_repo(source_url, revision) found = find_manifests(repo_dir, max_files=25) result: Dict[str, Any] = { "source_url": source_url, "revision": revision, "manifests": [str(p.relative_to(repo_dir)) for p in found], "dependencies": [], "errors": [], } for p in found[:max_manifests]: rel = str(p.relative_to(repo_dir)) try: text = p.read_text(errors="ignore") result["dependencies"].extend(parse_manifest_by_name(p.name, text)) except Exception as e: result["errors"].append(f"{rel}: {type(e).__name__}: {e}") result["dependencies"] = dedupe_deps_exact(result["dependencies"]) _REPO_CACHE[key] = result return result except Exception as e: result = { "source_url": source_url, "revision": revision, "manifests": [], "dependencies": [], "errors": [f"{type(e).__name__}: {e}"], } _REPO_CACHE[key] = result return result finally: if repo_dir: shutil.rmtree(repo_dir, ignore_errors=True)