from __future__ import annotations import json import re import urllib.request import urllib.error from collections import defaultdict from typing import List, Dict, Any, Tuple, Optional, Iterable from .config import OSV_QUERYBATCH_URL, OSV_ECOSYSTEM_MAP from .deps_pipeline import dedupe_effective from .progress import progress OSV_VULN_URL_TEMPLATE = "https://api.osv.dev/v1/vulns/{id}" _OSV_RESULT_CACHE: Dict[Tuple[str, str, str], List[str]] = {} _OSV_VULN_CACHE: Dict[str, Dict[str, Any]] = {} _SEVERITY_ORDER = {"UNKNOWN": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4} def _sev_rank(level: str) -> int: return _SEVERITY_ORDER.get(level.upper(), 0) def _score_to_severity(score: float) -> str: if score >= 9.0: return "CRITICAL" if score >= 7.0: return "HIGH" if score >= 4.0: return "MEDIUM" if score > 0.0: return "LOW" return "UNKNOWN" def _normalize_db_specific_severity(s: str) -> str: s = s.strip().upper() if s in {"LOW", "MEDIUM", "HIGH", "CRITICAL"}: return s if s == "MODERATE": return "MEDIUM" return "UNKNOWN" def _compute_vuln_severity(v: Dict[str, Any]) -> str: dbs = (v.get("database_specific") or {}) if isinstance(dbs, dict) and dbs.get("severity"): return _normalize_db_specific_severity(str(dbs.get("severity"))) sev = v.get("severity") or [] if isinstance(sev, list): for item in sev: if not isinstance(item, dict): continue score = item.get("score") if score is None: continue try: sc = float(str(score)) return _score_to_severity(sc) except ValueError: continue return "UNKNOWN" def _http_get_json(url: str, timeout: int = 30) -> Dict[str, Any]: req = urllib.request.Request(url=url, headers={"Accept": "application/json"}, method="GET") with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read().decode("utf-8", errors="ignore") return json.loads(body) def hydrate_vulns(ids: Iterable[str], *, progress_enabled: bool = True, max_ids: int = 300) -> None: uniq: List[str] = [] seen = set() for vid in ids: vid = str(vid) if vid in seen: continue seen.add(vid) if vid in _OSV_VULN_CACHE: continue uniq.append(vid) if len(uniq) >= max_ids: break it = progress(uniq, total=len(uniq), desc="OSV: загрузка деталей") if progress_enabled else iter(uniq) for vid in it: try: v = _http_get_json(OSV_VULN_URL_TEMPLATE.format(id=vid)) if isinstance(v, dict): _OSV_VULN_CACHE[vid] = v except Exception: continue def _is_exact_version_for_osv(internal_eco: str, spec: Optional[str], scope: Optional[str]) -> Optional[str]: if not spec: return None s = str(spec).strip() sc = (scope or "").lower() eco = internal_eco.lower() if sc == "lock": return s.lstrip("=") if eco == "go" and sc == "require": return s if eco == "pypi": if s.startswith("=="): return s[2:].strip() return None if eco == "npm": if any(s.startswith(x) for x in ("^", "~", ">", "<", "*")): return None if re.match(r"^v?\d+(\.\d+){0,3}([\-\+].+)?$", s): return s.lstrip("v") return None if eco in {"maven", "gradle", "nuget", "cargo"}: if "${" in s or s.startswith(("(", "[", "{")) or any(op in s for op in (">", "<", "*", ",")): return None if re.match(r"^v?\d+(\.\d+){0,3}([\-\+].+)?$", s): return s.lstrip("v") return None return None def _http_post_json(url: str, payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( url=url, data=data, headers={"Content-Type": "application/json", "Accept": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read().decode("utf-8", errors="ignore") return json.loads(body) def querybatch_paginated(queries: List[Dict[str, Any]], max_pages: int = 5) -> List[Dict[str, Any]]: if not queries: return [] data = _http_post_json(OSV_QUERYBATCH_URL, {"queries": queries}) results = data.get("results", []) if not isinstance(results, list): return [{"vulns": []} for _ in queries] acc: List[Dict[str, Any]] = [{"vulns": []} for _ in queries] next_tokens: Dict[int, str] = {} for i, r in enumerate(results): vulns = (r or {}).get("vulns", []) or [] if isinstance(vulns, list): acc[i]["vulns"].extend(vulns) tok = (r or {}).get("next_page_token") if tok: next_tokens[i] = tok pages = 1 while next_tokens and pages < max_pages: page_queries: List[Dict[str, Any]] = [] index_map: List[int] = [] for orig_idx, tok in next_tokens.items(): q = dict(queries[orig_idx]) q["page_token"] = tok page_queries.append(q) index_map.append(orig_idx) data = _http_post_json(OSV_QUERYBATCH_URL, {"queries": page_queries}) page_results = data.get("results", []) new_next: Dict[int, str] = {} for j, r in enumerate(page_results): orig_idx = index_map[j] vulns = (r or {}).get("vulns", []) or [] if isinstance(vulns, list): acc[orig_idx]["vulns"].extend(vulns) tok = (r or {}).get("next_page_token") if tok: new_next[orig_idx] = tok next_tokens = new_next pages += 1 if next_tokens: for idx in next_tokens.keys(): acc[idx]["truncated"] = True return acc def annotate_containers_with_osv( containers: List[Dict[str, Any]], *, chunk_size: int = 250, hydrate_details: bool = True, max_hydrate_ids: int = 300, min_severity: str = "MEDIUM", include_unknown: bool = True, progress_enabled: bool = True, ) -> None: to_query: List[Tuple[str, str, str]] = [] for c in containers: deps = c.get("dependencies") or [] deps_eff = dedupe_effective(deps) pinned: List[Dict[str, str]] = [] for d in deps_eff: internal_eco = (d.get("ecosystem") or "").lower() name = d.get("name") spec = d.get("spec") scope = d.get("scope") if not internal_eco or not name: continue osv_eco = OSV_ECOSYSTEM_MAP.get(internal_eco) if not osv_eco: continue ver = _is_exact_version_for_osv(internal_eco, spec, scope) if not ver: continue pinned.append({"ecosystem": osv_eco, "name": str(name), "version": str(ver)}) seen = set() pinned_u: List[Dict[str, str]] = [] for p in pinned: k = (p["ecosystem"], p["name"], p["version"]) if k in seen: continue seen.add(k) pinned_u.append(p) if k not in _OSV_RESULT_CACHE and k not in to_query: to_query.append(k) c["osv_pinned_deps"] = pinned_u if not to_query: for c in containers: c["osv_vuln_count"] = 0 c["osv_vuln_counts_by_severity"] = {} c["osv_affected_deps"] = [] c["osv_vulns_by_dep"] = {} return try: total_chunks = (len(to_query) + chunk_size - 1) // chunk_size chunk_indices = range(0, len(to_query), chunk_size) chunk_it = progress(chunk_indices, total=total_chunks, desc="OSV: querybatch") if progress_enabled else iter(chunk_indices) for start in chunk_it: batch = to_query[start:start + chunk_size] queries = [ {"package": {"ecosystem": eco, "name": name}, "version": ver} for (eco, name, ver) in batch ] results = querybatch_paginated(queries) for i, r in enumerate(results): eco, name, ver = batch[i] vulns = (r or {}).get("vulns", []) or [] ids: List[str] = [] for v in vulns: vid = (v or {}).get("id") if vid: ids.append(str(vid)) seen = set() ids_u: List[str] = [] for x in ids: if x in seen: continue seen.add(x) ids_u.append(x) _OSV_RESULT_CACHE[(eco, name, ver)] = ids_u except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, ValueError) as e: for c in containers: c["osv_errors"] = f"OSV query failed: {type(e).__name__}: {e}" c["osv_vuln_count"] = 0 c["osv_vuln_counts_by_severity"] = {} c["osv_affected_deps"] = [] c["osv_vulns_by_dep"] = {} return if hydrate_details: all_ids: List[str] = [] seen = set() for ids in _OSV_RESULT_CACHE.values(): for vid in ids: if vid in seen: continue seen.add(vid) all_ids.append(vid) hydrate_vulns(all_ids, progress_enabled=progress_enabled, max_ids=max_hydrate_ids) threshold = _sev_rank(min_severity) for c in containers: pinned = c.get("osv_pinned_deps") or [] vulns_by_dep: Dict[str, List[str]] = {} affected: List[Tuple[str, str, str, int, str]] = [] all_ids = set() counts: Dict[str, int] = defaultdict(int) for p in pinned: eco = p["ecosystem"] name = p["name"] ver = p["version"] ids = _OSV_RESULT_CACHE.get((eco, name, ver), []) if not ids: continue kept: List[str] = [] max_dep_sev = "UNKNOWN" for vid in ids: sev = "UNKNOWN" if hydrate_details and vid in _OSV_VULN_CACHE: sev = _compute_vuln_severity(_OSV_VULN_CACHE[vid]) if _sev_rank(sev) >= threshold or (sev == "UNKNOWN" and include_unknown): kept.append(vid) all_ids.add(vid) counts[sev] += 1 if _sev_rank(sev) > _sev_rank(max_dep_sev): max_dep_sev = sev if kept: key_str = f"{eco}:{name}@{ver}" vulns_by_dep[key_str] = kept affected.append((eco, name, ver, len(kept), max_dep_sev)) affected.sort(key=lambda x: (x[3], _sev_rank(x[4])), reverse=True) c["osv_vuln_count"] = len(all_ids) c["osv_vuln_counts_by_severity"] = dict(counts) c["osv_affected_deps"] = affected c["osv_vulns_by_dep"] = vulns_by_dep