+full refactor

+feat: configuration, progress bar, OSV
This commit is contained in:
2026-01-18 13:54:14 +03:00
parent b8c25b2529
commit a5714116ac
730 changed files with 246974 additions and 150 deletions
+53
View File
@@ -0,0 +1,53 @@
from __future__ import annotations
SKIP_DIRS: set[str] = {
".git",
"node_modules",
"vendor",
"dist",
"build",
"target",
".venv",
"__pycache__",
".idea",
".vscode",
}
MANIFEST_NAMES: set[str] = {
# Python
"requirements.txt",
"Pipfile",
"pyproject.toml",
"poetry.lock",
# Node
"package.json",
"package-lock.json",
"yarn.lock",
"pnpm-lock.yaml",
# Go
"go.mod",
"go.sum",
# Rust
"Cargo.toml",
"Cargo.lock",
# Java
"pom.xml",
"build.gradle",
"build.gradle.kts",
# .NET
"packages.config",
}
OSV_QUERYBATCH_URL = "https://api.osv.dev/v1/querybatch"
OSV_ECOSYSTEM_MAP: dict[str, str] = {
"pypi": "PyPI",
"npm": "npm",
"go": "Go",
"cargo": "crates.io",
"maven": "Maven",
"gradle": "Maven",
"nuget": "NuGet",
}
+35
View File
@@ -0,0 +1,35 @@
from __future__ import annotations
import io
import tarfile
from typing import Tuple
def exec_shell(container, cmd: str) -> Tuple[int, str]:
for shell in (["sh", "-lc"], ["bash", "-lc"]):
try:
res = container.exec_run(shell + [cmd], demux=False)
out = res.output.decode(errors="ignore") if isinstance(res.output, (bytes, bytearray)) else str(res.output)
return res.exit_code, out
except Exception:
continue
return 127, ""
def read_text_from_container(container, abs_path: str, max_bytes: int = 800_000) -> str:
stream, _stat = container.get_archive(abs_path)
buf = io.BytesIO()
for chunk in stream:
buf.write(chunk)
if buf.tell() > max_bytes:
break
buf.seek(0)
with tarfile.open(fileobj=buf) as tar:
members = tar.getmembers()
if not members:
raise FileNotFoundError(abs_path)
f = tar.extractfile(members[0])
if f is None:
raise FileNotFoundError(abs_path)
return f.read().decode(errors="ignore")
+215
View File
@@ -0,0 +1,215 @@
from __future__ import annotations
import os
import shlex
import shutil
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from .config import MANIFEST_NAMES
from .container_io import exec_shell, read_text_from_container
from .git_repo import checkout_repo, find_manifests
from .parsers import parse_manifest_by_name
_REPO_CACHE: Dict[Tuple[str, Optional[str]], Dict[str, Any]] = {}
def dedupe_deps_exact(deps: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen = set()
out = []
for d in deps:
key = (d.get("ecosystem"), d.get("name"), d.get("spec"), d.get("scope"))
if key in seen:
continue
seen.add(key)
out.append(d)
return out
def _parse_semverish(v: Optional[str]) -> Tuple[int, int, int, int]:
if not v:
return (0, 0, 0, 0)
s = str(v).strip().lstrip("=v")
import re
m = re.match(r"^(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.(\d+))?", s)
if not m:
return (0, 0, 0, 0)
parts = [int(x) if x is not None else 0 for x in m.groups()]
return tuple(parts) # type: ignore
def _priority(eco: str, spec: Optional[str], scope: Optional[str]) -> int:
scope_l = (scope or "").lower()
spec_s = (spec or "").strip()
if scope_l == "lock":
return 100
if eco == "go":
if scope_l == "require":
return 90
if scope_l == "sum":
return 10
if eco == "npm":
if scope_l == "dependencies":
return 80
if scope_l in {"optionaldependencies", "peerdependencies"}:
return 60
if scope_l == "devdependencies":
return 15
if eco == "pypi":
if spec_s.startswith("=="):
return 75
if spec_s:
return 55
return 40
if eco in {"cargo", "nuget", "maven", "gradle"}:
return 60
return 50
def dedupe_effective(deps: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
best: Dict[Tuple[str, str], Dict[str, Any]] = {}
for d in deps:
eco = d.get("ecosystem")
name = d.get("name")
if not eco or not name:
continue
spec = d.get("spec")
scope = d.get("scope")
pr = _priority(str(eco), spec, scope)
key = (str(eco), str(name))
if key not in best:
dd = dict(d)
dd["_pr"] = pr
best[key] = dd
continue
cur = best[key]
if pr > cur["_pr"]:
dd = dict(d)
dd["_pr"] = pr
best[key] = dd
elif pr == cur["_pr"]:
if _parse_semverish(str(spec) if spec else "") > _parse_semverish(str(cur.get("spec") or "")):
dd = dict(d)
dd["_pr"] = pr
best[key] = dd
out = list(best.values())
for x in out:
x.pop("_pr", None)
return out
def find_manifest_paths_in_container(container, workdir: str, max_depth: int = 4, limit: int = 12) -> List[str]:
wd_q = shlex.quote(workdir or "/")
find_cmd = (
f"cd {wd_q} 2>/dev/null || cd /; "
f"find . -maxdepth {max_depth} -type f \\( "
# Python
"-name 'requirements*.txt' -o "
"-name 'Pipfile' -o "
"-name 'pyproject.toml' -o "
"-name 'poetry.lock' -o "
# Node
"-name 'package.json' -o "
"-name 'package-lock.json' -o "
"-name 'yarn.lock' -o "
"-name 'pnpm-lock.yaml' -o "
# Go
"-name 'go.mod' -o "
"-name 'go.sum' -o "
# Rust
"-name 'Cargo.toml' -o "
"-name 'Cargo.lock' -o "
# Java
"-name 'pom.xml' -o "
"-name 'build.gradle' -o "
"-name 'build.gradle.kts' -o "
# .NET
"-name '*.csproj' -o "
"-name '*.fsproj' -o "
"-name 'packages.config' "
"\\) 2>/dev/null | head -n " + str(limit) #Сколько ж ещё сюда добавлять........
)
exit_code, out = exec_shell(container, find_cmd)
if exit_code != 0:
return []
return [line.strip() for line in out.splitlines() if line.strip()]
def extract_deps_from_container(container, workdir: str, rel_paths: List[str]) -> Dict[str, Any]:
manifests: List[str] = []
deps: List[Dict[str, Any]] = []
errors: List[str] = []
for rel in rel_paths[:12]:
rel = rel.strip()
if rel.startswith("./"):
abs_path = workdir.rstrip("/") + "/" + rel[2:]
elif rel.startswith("/"):
abs_path = rel
else:
abs_path = workdir.rstrip("/") + "/" + rel
try:
text = read_text_from_container(container, abs_path)
manifests.append(abs_path)
deps.extend(parse_manifest_by_name(abs_path, text))
except Exception as e:
errors.append(f"{abs_path}: {type(e).__name__}: {e}")
deps = dedupe_deps_exact(deps)
return {"manifests": manifests, "dependencies": deps, "errors": errors}
def extract_deps_from_repo(source_url: str, revision: Optional[str], max_manifests: int = 12) -> Dict[str, Any]:
key = (str(source_url), revision)
if key in _REPO_CACHE:
return _REPO_CACHE[key]
repo_dir: Optional[Path] = None
try:
repo_dir = checkout_repo(source_url, revision)
found = find_manifests(repo_dir, max_files=25)
result: Dict[str, Any] = {
"source_url": source_url,
"revision": revision,
"manifests": [str(p.relative_to(repo_dir)) for p in found],
"dependencies": [],
"errors": [],
}
for p in found[:max_manifests]:
rel = str(p.relative_to(repo_dir))
try:
text = p.read_text(errors="ignore")
result["dependencies"].extend(parse_manifest_by_name(p.name, text))
except Exception as e:
result["errors"].append(f"{rel}: {type(e).__name__}: {e}")
result["dependencies"] = dedupe_deps_exact(result["dependencies"])
_REPO_CACHE[key] = result
return result
except Exception as e:
result = {
"source_url": source_url,
"revision": revision,
"manifests": [],
"dependencies": [],
"errors": [f"{type(e).__name__}: {e}"],
}
_REPO_CACHE[key] = result
return result
finally:
if repo_dir:
shutil.rmtree(repo_dir, ignore_errors=True)
+93
View File
@@ -0,0 +1,93 @@
from __future__ import annotations
import os
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Optional, List
from urllib.parse import urlparse
from .config import SKIP_DIRS, MANIFEST_NAMES
def _safe_run(cmd: List[str], cwd: Optional[str | Path] = None, timeout: int = 120) -> subprocess.CompletedProcess:
env = dict(os.environ)
env["GIT_TERMINAL_PROMPT"] = "0" # do not hang on private repos
return subprocess.run(cmd, cwd=cwd, env=env, capture_output=True, text=True, timeout=timeout)
def normalize_git_url(url: str) -> str:
if not url:
return url
url = url.strip()
m = re.match(r"git@([^:]+):(.+)$", url)
if m:
host, path = m.group(1), m.group(2)
if path.endswith(".git"):
path = path[:-4]
return f"https://{host}/{path}".rstrip("/")
parsed = urlparse(url)
if parsed.scheme in ("http", "https"):
clean_path = parsed.path
for marker in ("/tree/", "/blob/"):
idx = clean_path.find(marker)
if idx != -1:
clean_path = clean_path[:idx]
if clean_path.endswith(".git"):
clean_path = clean_path[:-4]
clean_path = clean_path.rstrip("/")
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
return url[:-4] if url.endswith(".git") else url
def checkout_repo(url: str, revision: Optional[str]) -> Path:
url = normalize_git_url(url)
tmpdir = Path(tempfile.mkdtemp(prefix="cvexplorer_repo_"))
def must(p: subprocess.CompletedProcess, cmd: List[str]):
if p.returncode != 0:
raise RuntimeError(f"Command failed: {' '.join(cmd)}\n{p.stderr.strip()}")
if revision:
p = _safe_run(["git", "init"], cwd=tmpdir, timeout=30)
must(p, ["git", "init"])
p = _safe_run(["git", "remote", "add", "origin", url], cwd=tmpdir, timeout=30)
must(p, ["git", "remote", "add", "origin", url])
p = _safe_run(["git", "fetch", "--depth", "1", "origin", revision], cwd=tmpdir, timeout=120)
must(p, ["git", "fetch", "--depth", "1", "origin", revision])
p = _safe_run(["git", "checkout", "FETCH_HEAD"], cwd=tmpdir, timeout=30)
must(p, ["git", "checkout", "FETCH_HEAD"])
else:
shutil.rmtree(tmpdir, ignore_errors=True)
p = _safe_run(["git", "clone", "--depth", "1", url, str(tmpdir)], timeout=180)
must(p, ["git", "clone", "--depth", "1", url, str(tmpdir)])
return tmpdir
def find_manifests(repo_dir: Path, max_files: int = 25) -> List[Path]:
found: List[Path] = []
for root, dirs, files in os.walk(repo_dir):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for fname in files:
if fname in MANIFEST_NAMES or fname.endswith(".csproj") or fname.endswith(".fsproj"):
found.append(Path(root) / fname)
if len(found) >= max_files:
break
if len(found) >= max_files:
break
found.sort(key=lambda p: len(p.relative_to(repo_dir).parts))
return found
+46
View File
@@ -0,0 +1,46 @@
from __future__ import annotations
from .scanner import scan_running_containers
from .osv_client import annotate_containers_with_osv
from .report import print_report
from .user_config import load_config
def main() -> int:
cfg = load_config()
progress_enabled = bool(cfg.get("progress", {}).get("enabled", True))
containers = scan_running_containers(progress_enabled=progress_enabled)
# Compare against OSV (with severity filtering)
osv_cfg = cfg.get("osv", {}) or {}
vulns_cfg = cfg.get("vulns", {}) or {}
annotate_containers_with_osv(
containers,
chunk_size=int(osv_cfg.get("chunk_size", 250)),
hydrate_details=bool(osv_cfg.get("hydrate_details", True)),
max_hydrate_ids=int(osv_cfg.get("max_hydrate_ids", 300)),
min_severity=str(vulns_cfg.get("min_severity", "MEDIUM")),
include_unknown=bool(vulns_cfg.get("include_unknown", True)),
progress_enabled=progress_enabled,
)
out_cfg = cfg.get("output", {}) or {}
print_report(
containers,
group_by_service=bool(out_cfg.get("group_by_service", True)),
sections=out_cfg.get("sections", {}) or {},
max_deps_per_ecosystem=int(out_cfg.get("max_deps_per_ecosystem", 20)),
max_dev_deps_per_ecosystem=int(out_cfg.get("max_dev_deps_per_ecosystem", 10)),
top_affected=int(vulns_cfg.get("top_affected", 8)),
)
print("=" * 88)
print(f"Всего контейнеров проанализировано: {len(containers)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+354
View File
@@ -0,0 +1,354 @@
from __future__ import annotations
import json
import re
import urllib.request
import urllib.error
from collections import defaultdict
from typing import List, Dict, Any, Tuple, Optional, Iterable
from .config import OSV_QUERYBATCH_URL, OSV_ECOSYSTEM_MAP
from .deps_pipeline import dedupe_effective
from .progress import progress
OSV_VULN_URL_TEMPLATE = "https://api.osv.dev/v1/vulns/{id}"
_OSV_RESULT_CACHE: Dict[Tuple[str, str, str], List[str]] = {}
_OSV_VULN_CACHE: Dict[str, Dict[str, Any]] = {}
_SEVERITY_ORDER = {"UNKNOWN": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
def _sev_rank(level: str) -> int:
return _SEVERITY_ORDER.get(level.upper(), 0)
def _score_to_severity(score: float) -> str:
if score >= 9.0:
return "CRITICAL"
if score >= 7.0:
return "HIGH"
if score >= 4.0:
return "MEDIUM"
if score > 0.0:
return "LOW"
return "UNKNOWN"
def _normalize_db_specific_severity(s: str) -> str:
s = s.strip().upper()
if s in {"LOW", "MEDIUM", "HIGH", "CRITICAL"}:
return s
if s == "MODERATE":
return "MEDIUM"
return "UNKNOWN"
def _compute_vuln_severity(v: Dict[str, Any]) -> str:
dbs = (v.get("database_specific") or {})
if isinstance(dbs, dict) and dbs.get("severity"):
return _normalize_db_specific_severity(str(dbs.get("severity")))
sev = v.get("severity") or []
if isinstance(sev, list):
for item in sev:
if not isinstance(item, dict):
continue
score = item.get("score")
if score is None:
continue
try:
sc = float(str(score))
return _score_to_severity(sc)
except ValueError:
continue
return "UNKNOWN"
def _http_get_json(url: str, timeout: int = 30) -> Dict[str, Any]:
req = urllib.request.Request(url=url, headers={"Accept": "application/json"}, method="GET")
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8", errors="ignore")
return json.loads(body)
def hydrate_vulns(ids: Iterable[str], *, progress_enabled: bool = True, max_ids: int = 300) -> None:
uniq: List[str] = []
seen = set()
for vid in ids:
vid = str(vid)
if vid in seen:
continue
seen.add(vid)
if vid in _OSV_VULN_CACHE:
continue
uniq.append(vid)
if len(uniq) >= max_ids:
break
it = progress(uniq, total=len(uniq), desc="OSV: загрузка деталей") if progress_enabled else iter(uniq)
for vid in it:
try:
v = _http_get_json(OSV_VULN_URL_TEMPLATE.format(id=vid))
if isinstance(v, dict):
_OSV_VULN_CACHE[vid] = v
except Exception:
continue
def _is_exact_version_for_osv(internal_eco: str, spec: Optional[str], scope: Optional[str]) -> Optional[str]:
if not spec:
return None
s = str(spec).strip()
sc = (scope or "").lower()
eco = internal_eco.lower()
if sc == "lock":
return s.lstrip("=")
if eco == "go" and sc == "require":
return s
if eco == "pypi":
if s.startswith("=="):
return s[2:].strip()
return None
if eco == "npm":
if any(s.startswith(x) for x in ("^", "~", ">", "<", "*")):
return None
if re.match(r"^v?\d+(\.\d+){0,3}([\-\+].+)?$", s):
return s.lstrip("v")
return None
if eco in {"maven", "gradle", "nuget", "cargo"}:
if "${" in s or s.startswith(("(", "[", "{")) or any(op in s for op in (">", "<", "*", ",")):
return None
if re.match(r"^v?\d+(\.\d+){0,3}([\-\+].+)?$", s):
return s.lstrip("v")
return None
return None
def _http_post_json(url: str, payload: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url=url,
data=data,
headers={"Content-Type": "application/json", "Accept": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8", errors="ignore")
return json.loads(body)
def querybatch_paginated(queries: List[Dict[str, Any]], max_pages: int = 5) -> List[Dict[str, Any]]:
if not queries:
return []
data = _http_post_json(OSV_QUERYBATCH_URL, {"queries": queries})
results = data.get("results", [])
if not isinstance(results, list):
return [{"vulns": []} for _ in queries]
acc: List[Dict[str, Any]] = [{"vulns": []} for _ in queries]
next_tokens: Dict[int, str] = {}
for i, r in enumerate(results):
vulns = (r or {}).get("vulns", []) or []
if isinstance(vulns, list):
acc[i]["vulns"].extend(vulns)
tok = (r or {}).get("next_page_token")
if tok:
next_tokens[i] = tok
pages = 1
while next_tokens and pages < max_pages:
page_queries: List[Dict[str, Any]] = []
index_map: List[int] = []
for orig_idx, tok in next_tokens.items():
q = dict(queries[orig_idx])
q["page_token"] = tok
page_queries.append(q)
index_map.append(orig_idx)
data = _http_post_json(OSV_QUERYBATCH_URL, {"queries": page_queries})
page_results = data.get("results", [])
new_next: Dict[int, str] = {}
for j, r in enumerate(page_results):
orig_idx = index_map[j]
vulns = (r or {}).get("vulns", []) or []
if isinstance(vulns, list):
acc[orig_idx]["vulns"].extend(vulns)
tok = (r or {}).get("next_page_token")
if tok:
new_next[orig_idx] = tok
next_tokens = new_next
pages += 1
if next_tokens:
for idx in next_tokens.keys():
acc[idx]["truncated"] = True
return acc
def annotate_containers_with_osv(
containers: List[Dict[str, Any]],
*,
chunk_size: int = 250,
hydrate_details: bool = True,
max_hydrate_ids: int = 300,
min_severity: str = "MEDIUM",
include_unknown: bool = True,
progress_enabled: bool = True,
) -> None:
to_query: List[Tuple[str, str, str]] = []
for c in containers:
deps = c.get("dependencies") or []
deps_eff = dedupe_effective(deps)
pinned: List[Dict[str, str]] = []
for d in deps_eff:
internal_eco = (d.get("ecosystem") or "").lower()
name = d.get("name")
spec = d.get("spec")
scope = d.get("scope")
if not internal_eco or not name:
continue
osv_eco = OSV_ECOSYSTEM_MAP.get(internal_eco)
if not osv_eco:
continue
ver = _is_exact_version_for_osv(internal_eco, spec, scope)
if not ver:
continue
pinned.append({"ecosystem": osv_eco, "name": str(name), "version": str(ver)})
seen = set()
pinned_u: List[Dict[str, str]] = []
for p in pinned:
k = (p["ecosystem"], p["name"], p["version"])
if k in seen:
continue
seen.add(k)
pinned_u.append(p)
if k not in _OSV_RESULT_CACHE and k not in to_query:
to_query.append(k)
c["osv_pinned_deps"] = pinned_u
if not to_query:
for c in containers:
c["osv_vuln_count"] = 0
c["osv_vuln_counts_by_severity"] = {}
c["osv_affected_deps"] = []
c["osv_vulns_by_dep"] = {}
return
try:
total_chunks = (len(to_query) + chunk_size - 1) // chunk_size
chunk_indices = range(0, len(to_query), chunk_size)
chunk_it = progress(chunk_indices, total=total_chunks, desc="OSV: querybatch") if progress_enabled else iter(chunk_indices)
for start in chunk_it:
batch = to_query[start:start + chunk_size]
queries = [
{"package": {"ecosystem": eco, "name": name}, "version": ver}
for (eco, name, ver) in batch
]
results = querybatch_paginated(queries)
for i, r in enumerate(results):
eco, name, ver = batch[i]
vulns = (r or {}).get("vulns", []) or []
ids: List[str] = []
for v in vulns:
vid = (v or {}).get("id")
if vid:
ids.append(str(vid))
seen = set()
ids_u: List[str] = []
for x in ids:
if x in seen:
continue
seen.add(x)
ids_u.append(x)
_OSV_RESULT_CACHE[(eco, name, ver)] = ids_u
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, ValueError) as e:
for c in containers:
c["osv_errors"] = f"OSV query failed: {type(e).__name__}: {e}"
c["osv_vuln_count"] = 0
c["osv_vuln_counts_by_severity"] = {}
c["osv_affected_deps"] = []
c["osv_vulns_by_dep"] = {}
return
if hydrate_details:
all_ids: List[str] = []
seen = set()
for ids in _OSV_RESULT_CACHE.values():
for vid in ids:
if vid in seen:
continue
seen.add(vid)
all_ids.append(vid)
hydrate_vulns(all_ids, progress_enabled=progress_enabled, max_ids=max_hydrate_ids)
threshold = _sev_rank(min_severity)
for c in containers:
pinned = c.get("osv_pinned_deps") or []
vulns_by_dep: Dict[str, List[str]] = {}
affected: List[Tuple[str, str, str, int, str]] = []
all_ids = set()
counts: Dict[str, int] = defaultdict(int)
for p in pinned:
eco = p["ecosystem"]
name = p["name"]
ver = p["version"]
ids = _OSV_RESULT_CACHE.get((eco, name, ver), [])
if not ids:
continue
kept: List[str] = []
max_dep_sev = "UNKNOWN"
for vid in ids:
sev = "UNKNOWN"
if hydrate_details and vid in _OSV_VULN_CACHE:
sev = _compute_vuln_severity(_OSV_VULN_CACHE[vid])
if _sev_rank(sev) >= threshold or (sev == "UNKNOWN" and include_unknown):
kept.append(vid)
all_ids.add(vid)
counts[sev] += 1
if _sev_rank(sev) > _sev_rank(max_dep_sev):
max_dep_sev = sev
if kept:
key_str = f"{eco}:{name}@{ver}"
vulns_by_dep[key_str] = kept
affected.append((eco, name, ver, len(kept), max_dep_sev))
affected.sort(key=lambda x: (x[3], _sev_rank(x[4])), reverse=True)
c["osv_vuln_count"] = len(all_ids)
c["osv_vuln_counts_by_severity"] = dict(counts)
c["osv_affected_deps"] = affected
c["osv_vulns_by_dep"] = vulns_by_dep
+377
View File
@@ -0,0 +1,377 @@
from __future__ import annotations
import json
import os
import re
import xml.etree.ElementTree as ET
from typing import List, Dict, Any, Optional
from .toml_compat import loads as toml_loads
def _xml_text(elem: ET.Element, tag: str) -> Optional[str]:
def ns_strip(s: str) -> str:
return s.split("}", 1)[-1] if "}" in s else s
for child in elem:
if ns_strip(child.tag) == tag and child.text:
return child.text.strip()
return None
def parse_requirements_txt(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "://" in line or line.startswith("git+"):
deps.append({"ecosystem": "pypi", "name": line, "spec": None, "note": "non-standard", "scope": "requirements"})
continue
m = re.match(r"^([A-Za-z0-9_.-]+)\s*([<>=!~]=?)\s*([^\s;]+)", line)
if m:
name, op, ver = m.group(1), m.group(2), m.group(3)
deps.append({"ecosystem": "pypi", "name": name, "spec": f"{op}{ver}", "scope": "requirements"})
else:
deps.append({"ecosystem": "pypi", "name": line, "spec": None, "scope": "requirements"})
return deps
def parse_pipfile(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
data = toml_loads(text)
for section, scope in (("packages", "dependencies"), ("dev-packages", "devDependencies")):
block = data.get(section, {}) or {}
for name, spec in block.items():
if isinstance(spec, str):
deps.append({"ecosystem": "pypi", "name": name, "spec": spec, "scope": scope})
elif isinstance(spec, dict):
deps.append({"ecosystem": "pypi", "name": name, "spec": spec.get("version"), "scope": scope})
else:
deps.append({"ecosystem": "pypi", "name": name, "spec": None, "scope": scope})
return deps
def parse_pyproject_toml(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
data = toml_loads(text)
# PEP 621
proj = data.get("project", {}) or {}
for item in proj.get("dependencies", []) or []:
if isinstance(item, str):
deps.append({"ecosystem": "pypi", "name": item, "spec": None, "scope": "project.dependencies"})
opt = proj.get("optional-dependencies", {}) or {}
for group, items in opt.items():
for item in items or []:
if isinstance(item, str):
deps.append({"ecosystem": "pypi", "name": item, "spec": None, "scope": f"optional:{group}"})
# Poetry
tool = data.get("tool", {}) or {}
poetry = (tool.get("poetry", {}) or {})
for section, scope in (("dependencies", "poetry.dependencies"), ("dev-dependencies", "poetry.dev-dependencies")):
block = poetry.get(section, {}) or {}
for name, spec in block.items():
if name.lower() == "python":
continue
if isinstance(spec, str):
deps.append({"ecosystem": "pypi", "name": name, "spec": spec, "scope": scope})
elif isinstance(spec, dict):
deps.append({"ecosystem": "pypi", "name": name, "spec": spec.get("version"), "scope": scope})
else:
deps.append({"ecosystem": "pypi", "name": name, "spec": None, "scope": scope})
group = poetry.get("group", {}) or {}
for gname, gdata in group.items():
block = (gdata or {}).get("dependencies", {}) or {}
for name, spec in block.items():
if name.lower() == "python":
continue
if isinstance(spec, str):
deps.append({"ecosystem": "pypi", "name": name, "spec": spec, "scope": f"poetry.group.{gname}"})
elif isinstance(spec, dict):
deps.append({"ecosystem": "pypi", "name": name, "spec": spec.get("version"), "scope": f"poetry.group.{gname}"})
else:
deps.append({"ecosystem": "pypi", "name": name, "spec": None, "scope": f"poetry.group.{gname}"})
return deps
def parse_poetry_lock(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
data = toml_loads(text)
pkgs = data.get("package", []) or []
for p in pkgs:
name = p.get("name")
ver = p.get("version")
if name and ver:
deps.append({"ecosystem": "pypi", "name": name, "spec": ver, "scope": "lock"})
return deps
def parse_package_json(text: str) -> List[Dict[str, Any]]:
data = json.loads(text)
deps: List[Dict[str, Any]] = []
for section in ("dependencies", "devDependencies", "optionalDependencies", "peerDependencies"):
block = data.get(section, {}) or {}
for name, spec in block.items():
deps.append({"ecosystem": "npm", "name": name, "spec": spec, "scope": section})
return deps
def parse_package_lock_json(text: str) -> List[Dict[str, Any]]:
data = json.loads(text)
deps: List[Dict[str, Any]] = []
if isinstance(data.get("packages"), dict):
for k, v in data["packages"].items():
if not k or not k.startswith("node_modules/"):
continue
name = k[len("node_modules/"):]
ver = (v or {}).get("version")
if name and ver:
deps.append({"ecosystem": "npm", "name": name, "spec": ver, "scope": "lock"})
return deps
if isinstance(data.get("dependencies"), dict):
for name, v in data["dependencies"].items():
ver = (v or {}).get("version")
if name and ver:
deps.append({"ecosystem": "npm", "name": name, "spec": ver, "scope": "lock"})
return deps
return deps
def parse_yarn_lock(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
current_key: Optional[str] = None
for raw in text.splitlines():
line = raw.rstrip()
if not line:
continue
if not line.startswith(" ") and line.endswith(":"):
key = line[:-1].strip().strip('"').strip("'")
first = key.split(",")[0].strip().strip('"').strip("'")
current_key = first
continue
if current_key and line.strip().startswith("version "):
m = re.match(r'^\s*version\s+"([^"]+)"\s*$', line)
if m:
ver = m.group(1)
last_at = current_key.rfind("@")
if last_at > 0:
name = current_key[:last_at]
deps.append({"ecosystem": "npm", "name": name, "spec": ver, "scope": "lock"})
current_key = None
return deps
def parse_pnpm_lock_yaml(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
rx = re.compile(r'^\s*/(.+?)@([0-9][^:\s]+):\s*$', re.MULTILINE)
for m in rx.finditer(text):
name = m.group(1).strip()
ver = m.group(2).strip()
if name and ver:
deps.append({"ecosystem": "npm", "name": name, "spec": ver, "scope": "lock"})
return deps
def parse_go_mod(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
in_require = False
for raw in text.splitlines():
line = raw.strip()
if not line or line.startswith("//"):
continue
if line.startswith("require ("):
in_require = True
continue
if in_require and line == ")":
in_require = False
continue
if line.startswith("require "):
line = line[len("require "):].strip()
parts = line.split()
if len(parts) >= 2:
name, ver = parts[0], parts[1]
if name == "go":
continue
deps.append({"ecosystem": "go", "name": name, "spec": ver, "scope": "require"})
return deps
def parse_go_sum(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
seen = set()
for raw in text.splitlines():
line = raw.strip()
if not line:
continue
parts = line.split()
if len(parts) >= 2:
mod, ver = parts[0], parts[1]
ver = ver.replace("/go.mod", "")
key = (mod, ver)
if key in seen:
continue
seen.add(key)
deps.append({"ecosystem": "go", "name": mod, "spec": ver, "scope": "sum"})
return deps
def parse_cargo_toml(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
data = toml_loads(text)
for section, scope in (("dependencies", "dependencies"), ("dev-dependencies", "dev-dependencies"), ("build-dependencies", "build-dependencies")):
block = data.get(section, {}) or {}
for name, spec in block.items():
if isinstance(spec, str):
deps.append({"ecosystem": "cargo", "name": name, "spec": spec, "scope": scope})
elif isinstance(spec, dict):
deps.append({"ecosystem": "cargo", "name": name, "spec": spec.get("version"), "scope": scope})
else:
deps.append({"ecosystem": "cargo", "name": name, "spec": None, "scope": scope})
return deps
def parse_cargo_lock(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
data = toml_loads(text)
pkgs = data.get("package", []) or []
for p in pkgs:
name = p.get("name")
ver = p.get("version")
if name and ver:
deps.append({"ecosystem": "cargo", "name": name, "spec": ver, "scope": "lock"})
return deps
def parse_pom_xml(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
try:
root = ET.fromstring(text)
except ET.ParseError:
return deps
for dep in root.iter():
if not dep.tag.endswith("dependency"):
continue
gid = _xml_text(dep, "groupId")
aid = _xml_text(dep, "artifactId")
ver = _xml_text(dep, "version")
scope = _xml_text(dep, "scope") or "compile"
if gid and aid:
deps.append({"ecosystem": "maven", "name": f"{gid}:{aid}", "spec": ver, "scope": scope})
return deps
def parse_gradle_build(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
rx = re.compile(
r"^\s*(implementation|api|compileOnly|runtimeOnly|testImplementation|testCompileOnly)\s*\(?\s*['\"]([^'\"]+)['\"]\s*\)?\s*$",
re.MULTILINE,
)
for m in rx.finditer(text):
scope = m.group(1)
gav = m.group(2).strip()
parts = gav.split(":")
if len(parts) >= 2:
name = f"{parts[0]}:{parts[1]}"
ver = parts[2] if len(parts) >= 3 else None
deps.append({"ecosystem": "gradle", "name": name, "spec": ver, "scope": scope})
return deps
def parse_csproj(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
try:
root = ET.fromstring(text)
except ET.ParseError:
return deps
for elem in root.iter():
if not elem.tag.endswith("PackageReference"):
continue
name = elem.attrib.get("Include") or elem.attrib.get("Update")
ver = elem.attrib.get("Version")
if ver is None:
for ch in elem:
if ch.tag.endswith("Version") and ch.text:
ver = ch.text.strip()
break
if name:
deps.append({"ecosystem": "nuget", "name": name, "spec": ver, "scope": "csproj"})
return deps
def parse_packages_config(text: str) -> List[Dict[str, Any]]:
deps: List[Dict[str, Any]] = []
try:
root = ET.fromstring(text)
except ET.ParseError:
return deps
for pkg in root.findall(".//package"):
name = pkg.attrib.get("id")
ver = pkg.attrib.get("version")
if name:
deps.append({"ecosystem": "nuget", "name": name, "spec": ver, "scope": "packages.config"})
return deps
def parse_manifest_by_name(filename: str, text: str) -> List[Dict[str, Any]]:
base = os.path.basename(filename)
if base.startswith("requirements") and base.endswith(".txt"):
return parse_requirements_txt(text)
if base == "Pipfile":
return parse_pipfile(text)
if base == "pyproject.toml":
return parse_pyproject_toml(text)
if base == "poetry.lock":
return parse_poetry_lock(text)
if base == "package.json":
return parse_package_json(text)
if base == "package-lock.json":
return parse_package_lock_json(text)
if base == "yarn.lock":
return parse_yarn_lock(text)
if base == "pnpm-lock.yaml":
return parse_pnpm_lock_yaml(text)
if base == "go.mod":
return parse_go_mod(text)
if base == "go.sum":
return parse_go_sum(text)
if base == "Cargo.toml":
return parse_cargo_toml(text)
if base == "Cargo.lock":
return parse_cargo_lock(text)
if base == "pom.xml":
return parse_pom_xml(text)
if base in ("build.gradle", "build.gradle.kts"):
return parse_gradle_build(text)
if base.endswith(".csproj") or base.endswith(".fsproj"):
return parse_csproj(text)
if base == "packages.config":
return parse_packages_config(text)
return []
+14
View File
@@ -0,0 +1,14 @@
from __future__ import annotations
from typing import Iterable, Iterator, Optional, TypeVar
T = TypeVar("T")
def progress(iterable: Iterable[T], *, total: Optional[int] = None, desc: str = "") -> Iterator[T]:
try:
from tqdm import tqdm # type: ignore
return iter(tqdm(iterable, total=total, desc=desc))
except Exception:
return iter(iterable)
+259
View File
@@ -0,0 +1,259 @@
from __future__ import annotations
from collections import Counter, defaultdict
from typing import List, Dict, Any, Tuple, Optional
from .deps_pipeline import dedupe_effective
def _sec(sections: Dict[str, Any] | None, key: str, default: bool) -> bool:
"""Read a boolean 'sections' flag with a default."""
if not isinstance(sections, dict):
return default
v = sections.get(key, default)
return bool(v)
def _sec_str(sections: Dict[str, Any] | None, key: str, default: str) -> str:
if not isinstance(sections, dict):
return default
v = sections.get(key, default)
return str(v)
def _is_dev_scope(eco: str, scope: str | None) -> bool:
scope_l = (scope or "").lower()
if eco == "npm":
return scope_l == "devdependencies"
if eco == "pypi":
return ("dev" in scope_l) or ("test" in scope_l) or scope_l.startswith("optional:")
if eco in {"maven", "gradle"}:
return ("test" in scope_l) or scope_l in {"provided"}
return False
def split_important_and_dev(deps_eff: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
important, dev = [], []
for d in deps_eff:
eco = d.get("ecosystem") or "unknown"
if _is_dev_scope(str(eco), d.get("scope")):
dev.append(d)
else:
important.append(d)
return important, dev
def print_deps_grouped(title: str, deps_list: List[Dict[str, Any]], max_per_eco: int) -> None:
print(f" {title}:")
if not deps_list:
print(" - (нет)")
return
by_eco: dict[str, List[Dict[str, Any]]] = defaultdict(list)
for d in deps_list:
by_eco[str(d.get("ecosystem") or "unknown")].append(d)
for eco in sorted(by_eco.keys()):
items = sorted(by_eco[eco], key=lambda x: (str(x.get("name") or "")))
total = len(items)
print(f" [{eco}] {total}")
for i, d in enumerate(items):
if i >= max_per_eco:
print(f" ... ещё {total - i}")
break
name = d.get("name")
spec = d.get("spec")
scope = d.get("scope") or d.get("note")
line = f" - {name}"
if spec:
line += f" [{spec}]"
if scope:
line += f" ({scope})"
print(line)
def print_container_report(
container: Dict[str, Any],
*,
sections: Optional[Dict[str, Any]] = None,
max_deps_per_ecosystem: int = 20,
max_dev_deps_per_ecosystem: int = 10,
top_affected: int = 8,
) -> None:
if _sec(sections, "show_separator", False):
print("=" * 88)
else:
print()
print(f"Контейнер: {container.get('name')} Образ: {container.get('image')}")
if _sec(sections, "show_id_status", True):
line = f" ID: {container.get('id')} Статус: {container.get('status')}"
if _sec(sections, "show_created", False):
line += f" Создан: {container.get('create_time')}"
print(line)
if _sec(sections, "show_ports", False):
print(f" Порты: {container.get('ports')}")
if _sec(sections, "show_mounts", False):
mounts = container.get("mounted_data") or []
if isinstance(mounts, list) and mounts:
print(f" Маунты: {len(mounts)}")
for m in mounts[:5]:
if isinstance(m, dict):
src = m.get("Source") or m.get("Name") or "?"
dst = m.get("Destination") or "?"
mode = m.get("Mode") or ""
rw = "rw" if m.get("RW") else "ro"
extra = f" ({mode},{rw})" if mode else f" ({rw})"
print(f" - {src} -> {dst}{extra}")
if len(mounts) > 5:
print(f" ... ещё {len(mounts) - 5}")
else:
print(" Маунты: 0")
if _sec(sections, "show_versions", True):
print(f" Версия: tag={container.get('version')} label={container.get('image_version_label')}")
if _sec(sections, "show_language", True):
print(f" Язык: {container.get('language')}")
if _sec(sections, "show_source", True):
print(f" Source repo: {container.get('source_url')}")
if _sec(sections, "show_revision", True):
print(f" Revision: {container.get('source_revision')}")
if _sec(sections, "show_deps_source", True):
print(f" Источник зависимостей: {container.get('deps_source')}")
if _sec(sections, "show_manifests", True):
manifests = container.get("dep_manifests_used") or []
mode = _sec_str(sections, "manifests_mode", "count").lower()
if not isinstance(manifests, list):
manifests = []
if mode == "list":
print(f" Манифесты ({len(manifests)}): " + (", ".join(str(x) for x in manifests) if manifests else "(нет)"))
else:
print(f" Манифесты: {len(manifests)}")
raw_deps = container.get("dependencies") or []
if _sec(sections, "show_raw_deps_count", False):
print(f" Зависимостей (сырых): {len(raw_deps)}")
deps_eff = dedupe_effective(raw_deps)
if _sec(sections, "show_effective_deps_count", True):
print(f" Зависимости: {len(deps_eff)} (после дедупликации)")
if _sec(sections, "show_ecosystem_counts", True):
eco_counts = Counter(d.get("ecosystem") for d in deps_eff if d.get("ecosystem"))
if eco_counts:
print(" По экосистемам: " + ", ".join(f"{k}={v}" for k, v in sorted(eco_counts.items())))
if _sec(sections, "show_deps_list", False):
important, dev = split_important_and_dev(deps_eff)
print_deps_grouped("ВАЖНЫЕ (для CVE/анализа)", important, max_per_eco=max_deps_per_ecosystem)
if dev and _sec(sections, "show_dev_deps_list", False):
print_deps_grouped("DEV/TEST (при наличии)", dev, max_per_eco=max_dev_deps_per_ecosystem)
# OSV summary (after severity filtering)
if _sec(sections, "show_osv_summary", True):
if container.get("osv_errors"):
print(f" OSV: ОШИБКА: {container.get('osv_errors')}")
else:
pinned = container.get("osv_pinned_deps") or []
vuln_count = int(container.get("osv_vuln_count") or 0)
affected = container.get("osv_affected_deps") or []
counts = container.get("osv_vuln_counts_by_severity") or {}
sev_summary = ""
if isinstance(counts, dict) and counts:
ordered = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "UNKNOWN"]
parts = [f"{k}={int(counts.get(k, 0))}" for k in ordered if k in counts or k == "UNKNOWN"]
sev_summary = " (" + ", ".join(parts) + ")" if parts else ""
print(f" OSV: уязвимости={vuln_count}; затронутые пакеты={len(affected)}; проверено пакетов={len(pinned)}{sev_summary}")
if affected and _sec(sections, "show_osv_top_affected", True):
print(" OSV: топ уязвимых зависимостей:")
for row in affected[:top_affected]:
if len(row) >= 5:
eco, name, ver, cnt, max_sev = row[:5]
print(f" - {eco}:{name}@{ver} -> {cnt} ids (max={max_sev})")
else:
eco, name, ver, cnt = row[:4]
print(f" - {eco}:{name}@{ver} -> {cnt} ids")
if _sec(sections, "show_osv_sample_ids", False):
top_key = f"{affected[0][0]}:{affected[0][1]}@{affected[0][2]}"
ids = (container.get("osv_vulns_by_dep") or {}).get(top_key, [])
if ids:
print(" OSV: примеры vuln_ids (первые 10): " + ", ".join(ids[:10]))
if _sec(sections, "show_errors", True):
errs = container.get("dep_errors") or []
if errs:
print(" Примечания/ошибки (первые 8):")
for e in errs[:8]:
print(f" ! {e}")
if len(errs) > 8:
print(f" ... ещё {len(errs) - 8}")
if _sec(sections, "show_code_files", False):
print(f" code_files: {container.get('code_files')}")
def _service_key(container: Dict[str, Any]) -> str:
labels = container.get("all_labels") or container.get("labels") or {}
if isinstance(labels, dict):
svc = labels.get("com.docker.compose.service")
proj = labels.get("com.docker.compose.project")
if svc:
return f"{proj}:{svc}" if proj else str(svc)
name = str(container.get("name") or "")
m = name.rsplit("_", 1)
if len(m) == 2 and m[1].isdigit():
return m[0]
image = str(container.get("image") or "")
base = image.split("/")[-1]
base = base.split(":", 1)[0]
return base or name or "unknown"
def print_report(
containers: List[Dict[str, Any]],
*,
group_by_service: bool = True,
sections: Optional[Dict[str, Any]] = None,
max_deps_per_ecosystem: int = 20,
max_dev_deps_per_ecosystem: int = 10,
top_affected: int = 8,
) -> None:
if not group_by_service:
for c in containers:
print_container_report(
c,
sections=sections,
max_deps_per_ecosystem=max_deps_per_ecosystem,
max_dev_deps_per_ecosystem=max_dev_deps_per_ecosystem,
top_affected=top_affected,
)
return
groups: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for c in containers:
groups[_service_key(c)].append(c)
for key in sorted(groups.keys()):
if _sec(sections, "show_service_separator", True):
print("" * 88)
if _sec(sections, "show_service_header", True):
print(f"Сервис: {key} Контейнеров: {len(groups[key])}")
for c in groups[key]:
print_container_report(
c,
sections=sections,
max_deps_per_ecosystem=max_deps_per_ecosystem,
max_dev_deps_per_ecosystem=max_dev_deps_per_ecosystem,
top_affected=top_affected,
)
+157
View File
@@ -0,0 +1,157 @@
from __future__ import annotations
import os
import shlex
from typing import List, Dict, Any, Optional
import docker
from .container_io import exec_shell
from .progress import progress
from .deps_pipeline import (
find_manifest_paths_in_container,
extract_deps_from_container,
extract_deps_from_repo,
)
def _get_source_labels(all_labels: Dict[str, str]) -> tuple[Optional[str], Optional[str], Optional[str]]:
source_url = (
all_labels.get("org.opencontainers.image.source")
or all_labels.get("org.opencontainers.image.url")
or all_labels.get("org.opencontainers.image.documentation")
or all_labels.get("org.label-schema.vcs-url")
or all_labels.get("org.label-schema.url")
)
source_revision = (
all_labels.get("org.opencontainers.image.revision")
or all_labels.get("org.label-schema.vcs-ref")
)
image_version_label = (
all_labels.get("org.opencontainers.image.version")
or all_labels.get("org.label-schema.version")
)
return source_url, source_revision, image_version_label
def _guess_language_from_manifests(paths: List[str]) -> Optional[str]:
for p in paths:
base = os.path.basename(p)
lower = base.lower()
if lower.startswith("requirements") or base in ("Pipfile", "pyproject.toml", "poetry.lock"):
return "python"
if base in ("package.json", "package-lock.json", "yarn.lock", "pnpm-lock.yaml"):
return "nodejs"
if base in ("go.mod", "go.sum"):
return "go"
if base in ("Cargo.toml", "Cargo.lock"):
return "rust"
if base in ("pom.xml", "build.gradle", "build.gradle.kts"):
return "java"
if lower.endswith((".csproj", ".fsproj")) or base == "packages.config":
return ".net"
return None
def _find_code_files(container, workdir: str, max_depth: int = 4, limit: int = 10) -> List[str]:
wd_q = shlex.quote(workdir or "/")
cmd = (
f"cd {wd_q} 2>/dev/null || cd /; "
f"find . -maxdepth {max_depth} -type f \\( -name 'main.*' -o -name 'app.*' -o -name 'index.*' \\) "
f"2>/dev/null | head -n {limit}"
)
exit_code, out = exec_shell(container, cmd)
if exit_code != 0:
return []
return [line.strip() for line in out.splitlines() if line.strip()]
def _guess_language_from_code_files(code_paths: List[str]) -> Optional[str]:
for p in code_paths:
pl = p.lower()
if pl.endswith(".py"):
return "python"
if pl.endswith((".js", ".mjs", ".cjs")):
return "nodejs"
if pl.endswith(".go"):
return "go"
if pl.endswith(".rs"):
return "rust"
if pl.endswith((".java", ".kt")):
return "java"
if pl.endswith((".cs", ".fs")):
return ".net"
return None
def scan_running_containers(*, progress_enabled: bool = True) -> List[Dict[str, Any]]:
client = docker.from_env()
result: List[Dict[str, Any]] = []
containers = client.containers.list()
it = progress(containers, total=len(containers), desc="Сканирование контейнеров") if progress_enabled else iter(containers)
for c in it:
info = c.attrs
image = info["Config"]["Image"]
created = info["Created"]
ports = info["NetworkSettings"]["Ports"]
mounts = info.get("Mounts", [])
status = c.status
labels_container = info["Config"].get("Labels", {}) or {}
image_cfg = c.image.attrs.get("Config", {}) or {}
image_labels = image_cfg.get("Labels", {}) or {}
all_labels = {**image_labels, **labels_container}
source_url, source_revision, image_version_label = _get_source_labels(all_labels)
tags = c.image.tags
tag_version = tags[0].split(":", 1)[-1] if tags else None
workdir = info["Config"].get("WorkingDir") or "/"
dep_paths = find_manifest_paths_in_container(c, workdir)
language = _guess_language_from_manifests(dep_paths)
code_paths = _find_code_files(c, workdir)
if language is None:
language = _guess_language_from_code_files(code_paths)
container_data: Dict[str, Any] = {
"name": c.name,
"image": image,
"id": c.id[:12],
"status": status,
"create_time": created,
"ports": ports,
"mounted_data": mounts,
"version": tag_version,
"image_version_label": image_version_label,
"labels": labels_container,
"all_labels": all_labels,
"source_url": source_url,
"source_revision": source_revision,
"language": language,
"dep_files": dep_paths,
"code_files": code_paths,
}
deps_source = "none"
deps_info: Dict[str, Any] = {"manifests": [], "dependencies": [], "errors": []}
if dep_paths:
deps_source = "container"
deps_info = extract_deps_from_container(c, workdir, dep_paths)
elif source_url:
deps_source = "git"
deps_info = extract_deps_from_repo(source_url, source_revision)
container_data["deps_source"] = deps_source
container_data["dep_manifests_used"] = deps_info.get("manifests", [])
container_data["dependencies"] = deps_info.get("dependencies", [])
container_data["dep_errors"] = deps_info.get("errors", [])
result.append(container_data)
return result
+9
View File
@@ -0,0 +1,9 @@
from __future__ import annotations
try:
import tomllib as _toml
except ImportError:
import tomli as _toml
def loads(text: str):
return _toml.loads(text)
+75
View File
@@ -0,0 +1,75 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict
DEFAULT_CONFIG: Dict[str, Any] = {
"progress": {"enabled": True},
"output": {
"group_by_service": True,
"max_deps_per_ecosystem": 20,
"max_dev_deps_per_ecosystem": 10,
"sections": {
"show_service_separator": True,
"show_service_header": True,
"show_separator": False,
"show_id_status": False,
"show_created": False,
"show_ports": False,
"show_mounts": False,
"show_versions": True,
"show_language": False,
"show_source": True,
"show_revision": False,
"show_deps_source": True,
"show_manifests": True,
"manifests_mode": "count",
"show_raw_deps_count": False,
"show_ecosystem_counts": True,
"show_effective_deps_count": True,
"show_deps_list": False,
"show_dev_deps_list": False,
"show_code_files": False,
"show_osv_summary": True,
"show_osv_top_affected": True,
"show_osv_sample_ids": False,
"show_errors": True,
},
},
"vulns": {
"min_severity": "MEDIUM",
"include_unknown": True,
"top_affected": 8,
},
"osv": {
"chunk_size": 250,
"hydrate_details": True,
"max_hydrate_ids": 300,
},
}
def _deep_merge(dst: Dict[str, Any], src: Dict[str, Any]) -> Dict[str, Any]:
for k, v in src.items():
if isinstance(v, dict) and isinstance(dst.get(k), dict):
dst[k] = _deep_merge(dict(dst[k]), v)
else:
dst[k] = v
return dst
def load_config(filename: str = "cvexplorer_config.json") -> Dict[str, Any]:
cfg = json.loads(json.dumps(DEFAULT_CONFIG)) # deep copy
path = Path.cwd() / filename
if not path.exists():
return cfg
try:
user_cfg = json.loads(path.read_text(encoding="utf-8"))
if isinstance(user_cfg, dict):
return _deep_merge(cfg, user_cfg)
return cfg
except Exception:
return cfg