158 lines
5.4 KiB
Python
158 lines
5.4 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import os
|
||
|
|
import shlex
|
||
|
|
from typing import List, Dict, Any, Optional
|
||
|
|
|
||
|
|
import docker
|
||
|
|
|
||
|
|
from .container_io import exec_shell
|
||
|
|
from .progress import progress
|
||
|
|
from .deps_pipeline import (
|
||
|
|
find_manifest_paths_in_container,
|
||
|
|
extract_deps_from_container,
|
||
|
|
extract_deps_from_repo,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _get_source_labels(all_labels: Dict[str, str]) -> tuple[Optional[str], Optional[str], Optional[str]]:
|
||
|
|
source_url = (
|
||
|
|
all_labels.get("org.opencontainers.image.source")
|
||
|
|
or all_labels.get("org.opencontainers.image.url")
|
||
|
|
or all_labels.get("org.opencontainers.image.documentation")
|
||
|
|
or all_labels.get("org.label-schema.vcs-url")
|
||
|
|
or all_labels.get("org.label-schema.url")
|
||
|
|
)
|
||
|
|
source_revision = (
|
||
|
|
all_labels.get("org.opencontainers.image.revision")
|
||
|
|
or all_labels.get("org.label-schema.vcs-ref")
|
||
|
|
)
|
||
|
|
image_version_label = (
|
||
|
|
all_labels.get("org.opencontainers.image.version")
|
||
|
|
or all_labels.get("org.label-schema.version")
|
||
|
|
)
|
||
|
|
return source_url, source_revision, image_version_label
|
||
|
|
|
||
|
|
|
||
|
|
def _guess_language_from_manifests(paths: List[str]) -> Optional[str]:
|
||
|
|
for p in paths:
|
||
|
|
base = os.path.basename(p)
|
||
|
|
lower = base.lower()
|
||
|
|
if lower.startswith("requirements") or base in ("Pipfile", "pyproject.toml", "poetry.lock"):
|
||
|
|
return "python"
|
||
|
|
if base in ("package.json", "package-lock.json", "yarn.lock", "pnpm-lock.yaml"):
|
||
|
|
return "nodejs"
|
||
|
|
if base in ("go.mod", "go.sum"):
|
||
|
|
return "go"
|
||
|
|
if base in ("Cargo.toml", "Cargo.lock"):
|
||
|
|
return "rust"
|
||
|
|
if base in ("pom.xml", "build.gradle", "build.gradle.kts"):
|
||
|
|
return "java"
|
||
|
|
if lower.endswith((".csproj", ".fsproj")) or base == "packages.config":
|
||
|
|
return ".net"
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def _find_code_files(container, workdir: str, max_depth: int = 4, limit: int = 10) -> List[str]:
|
||
|
|
wd_q = shlex.quote(workdir or "/")
|
||
|
|
cmd = (
|
||
|
|
f"cd {wd_q} 2>/dev/null || cd /; "
|
||
|
|
f"find . -maxdepth {max_depth} -type f \\( -name 'main.*' -o -name 'app.*' -o -name 'index.*' \\) "
|
||
|
|
f"2>/dev/null | head -n {limit}"
|
||
|
|
)
|
||
|
|
exit_code, out = exec_shell(container, cmd)
|
||
|
|
if exit_code != 0:
|
||
|
|
return []
|
||
|
|
return [line.strip() for line in out.splitlines() if line.strip()]
|
||
|
|
|
||
|
|
|
||
|
|
def _guess_language_from_code_files(code_paths: List[str]) -> Optional[str]:
|
||
|
|
for p in code_paths:
|
||
|
|
pl = p.lower()
|
||
|
|
if pl.endswith(".py"):
|
||
|
|
return "python"
|
||
|
|
if pl.endswith((".js", ".mjs", ".cjs")):
|
||
|
|
return "nodejs"
|
||
|
|
if pl.endswith(".go"):
|
||
|
|
return "go"
|
||
|
|
if pl.endswith(".rs"):
|
||
|
|
return "rust"
|
||
|
|
if pl.endswith((".java", ".kt")):
|
||
|
|
return "java"
|
||
|
|
if pl.endswith((".cs", ".fs")):
|
||
|
|
return ".net"
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def scan_running_containers(*, progress_enabled: bool = True) -> List[Dict[str, Any]]:
|
||
|
|
client = docker.from_env()
|
||
|
|
result: List[Dict[str, Any]] = []
|
||
|
|
|
||
|
|
containers = client.containers.list()
|
||
|
|
it = progress(containers, total=len(containers), desc="Сканирование контейнеров") if progress_enabled else iter(containers)
|
||
|
|
for c in it:
|
||
|
|
info = c.attrs
|
||
|
|
|
||
|
|
image = info["Config"]["Image"]
|
||
|
|
created = info["Created"]
|
||
|
|
ports = info["NetworkSettings"]["Ports"]
|
||
|
|
mounts = info.get("Mounts", [])
|
||
|
|
status = c.status
|
||
|
|
|
||
|
|
labels_container = info["Config"].get("Labels", {}) or {}
|
||
|
|
image_cfg = c.image.attrs.get("Config", {}) or {}
|
||
|
|
image_labels = image_cfg.get("Labels", {}) or {}
|
||
|
|
all_labels = {**image_labels, **labels_container}
|
||
|
|
|
||
|
|
source_url, source_revision, image_version_label = _get_source_labels(all_labels)
|
||
|
|
|
||
|
|
tags = c.image.tags
|
||
|
|
tag_version = tags[0].split(":", 1)[-1] if tags else None
|
||
|
|
|
||
|
|
workdir = info["Config"].get("WorkingDir") or "/"
|
||
|
|
|
||
|
|
dep_paths = find_manifest_paths_in_container(c, workdir)
|
||
|
|
language = _guess_language_from_manifests(dep_paths)
|
||
|
|
|
||
|
|
code_paths = _find_code_files(c, workdir)
|
||
|
|
if language is None:
|
||
|
|
language = _guess_language_from_code_files(code_paths)
|
||
|
|
|
||
|
|
container_data: Dict[str, Any] = {
|
||
|
|
"name": c.name,
|
||
|
|
"image": image,
|
||
|
|
"id": c.id[:12],
|
||
|
|
"status": status,
|
||
|
|
"create_time": created,
|
||
|
|
"ports": ports,
|
||
|
|
"mounted_data": mounts,
|
||
|
|
"version": tag_version,
|
||
|
|
"image_version_label": image_version_label,
|
||
|
|
"labels": labels_container,
|
||
|
|
"all_labels": all_labels,
|
||
|
|
"source_url": source_url,
|
||
|
|
"source_revision": source_revision,
|
||
|
|
"language": language,
|
||
|
|
"dep_files": dep_paths,
|
||
|
|
"code_files": code_paths,
|
||
|
|
}
|
||
|
|
|
||
|
|
deps_source = "none"
|
||
|
|
deps_info: Dict[str, Any] = {"manifests": [], "dependencies": [], "errors": []}
|
||
|
|
|
||
|
|
if dep_paths:
|
||
|
|
deps_source = "container"
|
||
|
|
deps_info = extract_deps_from_container(c, workdir, dep_paths)
|
||
|
|
elif source_url:
|
||
|
|
deps_source = "git"
|
||
|
|
deps_info = extract_deps_from_repo(source_url, source_revision)
|
||
|
|
|
||
|
|
container_data["deps_source"] = deps_source
|
||
|
|
container_data["dep_manifests_used"] = deps_info.get("manifests", [])
|
||
|
|
container_data["dependencies"] = deps_info.get("dependencies", [])
|
||
|
|
container_data["dep_errors"] = deps_info.get("errors", [])
|
||
|
|
|
||
|
|
result.append(container_data)
|
||
|
|
|
||
|
|
return result
|