a5714116ac
+feat: configuration, progress bar, OSV
355 lines
11 KiB
Python
355 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import urllib.request
|
|
import urllib.error
|
|
from collections import defaultdict
|
|
from typing import List, Dict, Any, Tuple, Optional, Iterable
|
|
|
|
from .config import OSV_QUERYBATCH_URL, OSV_ECOSYSTEM_MAP
|
|
from .deps_pipeline import dedupe_effective
|
|
from .progress import progress
|
|
|
|
|
|
OSV_VULN_URL_TEMPLATE = "https://api.osv.dev/v1/vulns/{id}"
|
|
|
|
_OSV_RESULT_CACHE: Dict[Tuple[str, str, str], List[str]] = {}
|
|
|
|
_OSV_VULN_CACHE: Dict[str, Dict[str, Any]] = {}
|
|
|
|
_SEVERITY_ORDER = {"UNKNOWN": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
|
|
|
|
def _sev_rank(level: str) -> int:
|
|
return _SEVERITY_ORDER.get(level.upper(), 0)
|
|
|
|
|
|
def _score_to_severity(score: float) -> str:
|
|
if score >= 9.0:
|
|
return "CRITICAL"
|
|
if score >= 7.0:
|
|
return "HIGH"
|
|
if score >= 4.0:
|
|
return "MEDIUM"
|
|
if score > 0.0:
|
|
return "LOW"
|
|
return "UNKNOWN"
|
|
|
|
|
|
def _normalize_db_specific_severity(s: str) -> str:
|
|
s = s.strip().upper()
|
|
if s in {"LOW", "MEDIUM", "HIGH", "CRITICAL"}:
|
|
return s
|
|
if s == "MODERATE":
|
|
return "MEDIUM"
|
|
return "UNKNOWN"
|
|
|
|
|
|
def _compute_vuln_severity(v: Dict[str, Any]) -> str:
|
|
dbs = (v.get("database_specific") or {})
|
|
if isinstance(dbs, dict) and dbs.get("severity"):
|
|
return _normalize_db_specific_severity(str(dbs.get("severity")))
|
|
|
|
sev = v.get("severity") or []
|
|
if isinstance(sev, list):
|
|
for item in sev:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
score = item.get("score")
|
|
if score is None:
|
|
continue
|
|
try:
|
|
sc = float(str(score))
|
|
return _score_to_severity(sc)
|
|
except ValueError:
|
|
continue
|
|
|
|
return "UNKNOWN"
|
|
|
|
|
|
def _http_get_json(url: str, timeout: int = 30) -> Dict[str, Any]:
|
|
req = urllib.request.Request(url=url, headers={"Accept": "application/json"}, method="GET")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
body = resp.read().decode("utf-8", errors="ignore")
|
|
return json.loads(body)
|
|
|
|
|
|
def hydrate_vulns(ids: Iterable[str], *, progress_enabled: bool = True, max_ids: int = 300) -> None:
|
|
uniq: List[str] = []
|
|
seen = set()
|
|
for vid in ids:
|
|
vid = str(vid)
|
|
if vid in seen:
|
|
continue
|
|
seen.add(vid)
|
|
if vid in _OSV_VULN_CACHE:
|
|
continue
|
|
uniq.append(vid)
|
|
if len(uniq) >= max_ids:
|
|
break
|
|
|
|
it = progress(uniq, total=len(uniq), desc="OSV: загрузка деталей") if progress_enabled else iter(uniq)
|
|
for vid in it:
|
|
try:
|
|
v = _http_get_json(OSV_VULN_URL_TEMPLATE.format(id=vid))
|
|
if isinstance(v, dict):
|
|
_OSV_VULN_CACHE[vid] = v
|
|
except Exception:
|
|
|
|
continue
|
|
|
|
|
|
def _is_exact_version_for_osv(internal_eco: str, spec: Optional[str], scope: Optional[str]) -> Optional[str]:
|
|
if not spec:
|
|
return None
|
|
|
|
s = str(spec).strip()
|
|
sc = (scope or "").lower()
|
|
eco = internal_eco.lower()
|
|
|
|
if sc == "lock":
|
|
return s.lstrip("=")
|
|
|
|
if eco == "go" and sc == "require":
|
|
return s
|
|
|
|
if eco == "pypi":
|
|
if s.startswith("=="):
|
|
return s[2:].strip()
|
|
return None
|
|
|
|
if eco == "npm":
|
|
if any(s.startswith(x) for x in ("^", "~", ">", "<", "*")):
|
|
return None
|
|
if re.match(r"^v?\d+(\.\d+){0,3}([\-\+].+)?$", s):
|
|
return s.lstrip("v")
|
|
return None
|
|
|
|
if eco in {"maven", "gradle", "nuget", "cargo"}:
|
|
if "${" in s or s.startswith(("(", "[", "{")) or any(op in s for op in (">", "<", "*", ",")):
|
|
return None
|
|
if re.match(r"^v?\d+(\.\d+){0,3}([\-\+].+)?$", s):
|
|
return s.lstrip("v")
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
def _http_post_json(url: str, payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
url=url,
|
|
data=data,
|
|
headers={"Content-Type": "application/json", "Accept": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
body = resp.read().decode("utf-8", errors="ignore")
|
|
return json.loads(body)
|
|
|
|
|
|
def querybatch_paginated(queries: List[Dict[str, Any]], max_pages: int = 5) -> List[Dict[str, Any]]:
|
|
if not queries:
|
|
return []
|
|
|
|
data = _http_post_json(OSV_QUERYBATCH_URL, {"queries": queries})
|
|
results = data.get("results", [])
|
|
if not isinstance(results, list):
|
|
return [{"vulns": []} for _ in queries]
|
|
|
|
acc: List[Dict[str, Any]] = [{"vulns": []} for _ in queries]
|
|
next_tokens: Dict[int, str] = {}
|
|
|
|
for i, r in enumerate(results):
|
|
vulns = (r or {}).get("vulns", []) or []
|
|
if isinstance(vulns, list):
|
|
acc[i]["vulns"].extend(vulns)
|
|
tok = (r or {}).get("next_page_token")
|
|
if tok:
|
|
next_tokens[i] = tok
|
|
|
|
pages = 1
|
|
while next_tokens and pages < max_pages:
|
|
page_queries: List[Dict[str, Any]] = []
|
|
index_map: List[int] = []
|
|
|
|
for orig_idx, tok in next_tokens.items():
|
|
q = dict(queries[orig_idx])
|
|
q["page_token"] = tok
|
|
page_queries.append(q)
|
|
index_map.append(orig_idx)
|
|
|
|
data = _http_post_json(OSV_QUERYBATCH_URL, {"queries": page_queries})
|
|
page_results = data.get("results", [])
|
|
new_next: Dict[int, str] = {}
|
|
|
|
for j, r in enumerate(page_results):
|
|
orig_idx = index_map[j]
|
|
vulns = (r or {}).get("vulns", []) or []
|
|
if isinstance(vulns, list):
|
|
acc[orig_idx]["vulns"].extend(vulns)
|
|
tok = (r or {}).get("next_page_token")
|
|
if tok:
|
|
new_next[orig_idx] = tok
|
|
|
|
next_tokens = new_next
|
|
pages += 1
|
|
|
|
if next_tokens:
|
|
for idx in next_tokens.keys():
|
|
acc[idx]["truncated"] = True
|
|
|
|
return acc
|
|
|
|
|
|
def annotate_containers_with_osv(
|
|
containers: List[Dict[str, Any]],
|
|
*,
|
|
chunk_size: int = 250,
|
|
hydrate_details: bool = True,
|
|
max_hydrate_ids: int = 300,
|
|
min_severity: str = "MEDIUM",
|
|
include_unknown: bool = True,
|
|
progress_enabled: bool = True,
|
|
) -> None:
|
|
to_query: List[Tuple[str, str, str]] = []
|
|
for c in containers:
|
|
deps = c.get("dependencies") or []
|
|
deps_eff = dedupe_effective(deps)
|
|
|
|
pinned: List[Dict[str, str]] = []
|
|
for d in deps_eff:
|
|
internal_eco = (d.get("ecosystem") or "").lower()
|
|
name = d.get("name")
|
|
spec = d.get("spec")
|
|
scope = d.get("scope")
|
|
|
|
if not internal_eco or not name:
|
|
continue
|
|
|
|
osv_eco = OSV_ECOSYSTEM_MAP.get(internal_eco)
|
|
if not osv_eco:
|
|
continue
|
|
|
|
ver = _is_exact_version_for_osv(internal_eco, spec, scope)
|
|
if not ver:
|
|
continue
|
|
|
|
pinned.append({"ecosystem": osv_eco, "name": str(name), "version": str(ver)})
|
|
|
|
seen = set()
|
|
pinned_u: List[Dict[str, str]] = []
|
|
for p in pinned:
|
|
k = (p["ecosystem"], p["name"], p["version"])
|
|
if k in seen:
|
|
continue
|
|
seen.add(k)
|
|
pinned_u.append(p)
|
|
if k not in _OSV_RESULT_CACHE and k not in to_query:
|
|
to_query.append(k)
|
|
|
|
c["osv_pinned_deps"] = pinned_u
|
|
|
|
if not to_query:
|
|
for c in containers:
|
|
c["osv_vuln_count"] = 0
|
|
c["osv_vuln_counts_by_severity"] = {}
|
|
c["osv_affected_deps"] = []
|
|
c["osv_vulns_by_dep"] = {}
|
|
return
|
|
|
|
try:
|
|
total_chunks = (len(to_query) + chunk_size - 1) // chunk_size
|
|
chunk_indices = range(0, len(to_query), chunk_size)
|
|
chunk_it = progress(chunk_indices, total=total_chunks, desc="OSV: querybatch") if progress_enabled else iter(chunk_indices)
|
|
|
|
for start in chunk_it:
|
|
batch = to_query[start:start + chunk_size]
|
|
queries = [
|
|
{"package": {"ecosystem": eco, "name": name}, "version": ver}
|
|
for (eco, name, ver) in batch
|
|
]
|
|
|
|
results = querybatch_paginated(queries)
|
|
|
|
for i, r in enumerate(results):
|
|
eco, name, ver = batch[i]
|
|
vulns = (r or {}).get("vulns", []) or []
|
|
ids: List[str] = []
|
|
for v in vulns:
|
|
vid = (v or {}).get("id")
|
|
if vid:
|
|
ids.append(str(vid))
|
|
|
|
seen = set()
|
|
ids_u: List[str] = []
|
|
for x in ids:
|
|
if x in seen:
|
|
continue
|
|
seen.add(x)
|
|
ids_u.append(x)
|
|
|
|
_OSV_RESULT_CACHE[(eco, name, ver)] = ids_u
|
|
|
|
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, ValueError) as e:
|
|
for c in containers:
|
|
c["osv_errors"] = f"OSV query failed: {type(e).__name__}: {e}"
|
|
c["osv_vuln_count"] = 0
|
|
c["osv_vuln_counts_by_severity"] = {}
|
|
c["osv_affected_deps"] = []
|
|
c["osv_vulns_by_dep"] = {}
|
|
return
|
|
|
|
if hydrate_details:
|
|
all_ids: List[str] = []
|
|
seen = set()
|
|
for ids in _OSV_RESULT_CACHE.values():
|
|
for vid in ids:
|
|
if vid in seen:
|
|
continue
|
|
seen.add(vid)
|
|
all_ids.append(vid)
|
|
hydrate_vulns(all_ids, progress_enabled=progress_enabled, max_ids=max_hydrate_ids)
|
|
|
|
threshold = _sev_rank(min_severity)
|
|
|
|
for c in containers:
|
|
pinned = c.get("osv_pinned_deps") or []
|
|
vulns_by_dep: Dict[str, List[str]] = {}
|
|
affected: List[Tuple[str, str, str, int, str]] = []
|
|
all_ids = set()
|
|
counts: Dict[str, int] = defaultdict(int)
|
|
|
|
for p in pinned:
|
|
eco = p["ecosystem"]
|
|
name = p["name"]
|
|
ver = p["version"]
|
|
ids = _OSV_RESULT_CACHE.get((eco, name, ver), [])
|
|
if not ids:
|
|
continue
|
|
|
|
kept: List[str] = []
|
|
max_dep_sev = "UNKNOWN"
|
|
|
|
for vid in ids:
|
|
sev = "UNKNOWN"
|
|
if hydrate_details and vid in _OSV_VULN_CACHE:
|
|
sev = _compute_vuln_severity(_OSV_VULN_CACHE[vid])
|
|
if _sev_rank(sev) >= threshold or (sev == "UNKNOWN" and include_unknown):
|
|
kept.append(vid)
|
|
all_ids.add(vid)
|
|
counts[sev] += 1
|
|
if _sev_rank(sev) > _sev_rank(max_dep_sev):
|
|
max_dep_sev = sev
|
|
|
|
if kept:
|
|
key_str = f"{eco}:{name}@{ver}"
|
|
vulns_by_dep[key_str] = kept
|
|
affected.append((eco, name, ver, len(kept), max_dep_sev))
|
|
|
|
affected.sort(key=lambda x: (x[3], _sev_rank(x[4])), reverse=True)
|
|
c["osv_vuln_count"] = len(all_ids)
|
|
c["osv_vuln_counts_by_severity"] = dict(counts)
|
|
c["osv_affected_deps"] = affected
|
|
c["osv_vulns_by_dep"] = vulns_by_dep
|