from __future__ import annotations import os import shlex from typing import List, Dict, Any, Optional import docker from .container_io import exec_shell from .progress import progress from .deps_pipeline import ( find_manifest_paths_in_container, extract_deps_from_container, extract_deps_from_repo, ) def _get_source_labels(all_labels: Dict[str, str]) -> tuple[Optional[str], Optional[str], Optional[str]]: source_url = ( all_labels.get("org.opencontainers.image.source") or all_labels.get("org.opencontainers.image.url") or all_labels.get("org.opencontainers.image.documentation") or all_labels.get("org.label-schema.vcs-url") or all_labels.get("org.label-schema.url") ) source_revision = ( all_labels.get("org.opencontainers.image.revision") or all_labels.get("org.label-schema.vcs-ref") ) image_version_label = ( all_labels.get("org.opencontainers.image.version") or all_labels.get("org.label-schema.version") ) return source_url, source_revision, image_version_label def _guess_language_from_manifests(paths: List[str]) -> Optional[str]: for p in paths: base = os.path.basename(p) lower = base.lower() if lower.startswith("requirements") or base in ("Pipfile", "pyproject.toml", "poetry.lock"): return "python" if base in ("package.json", "package-lock.json", "yarn.lock", "pnpm-lock.yaml"): return "nodejs" if base in ("go.mod", "go.sum"): return "go" if base in ("Cargo.toml", "Cargo.lock"): return "rust" if base in ("pom.xml", "build.gradle", "build.gradle.kts"): return "java" if lower.endswith((".csproj", ".fsproj")) or base == "packages.config": return ".net" return None def _find_code_files(container, workdir: str, max_depth: int = 4, limit: int = 10) -> List[str]: wd_q = shlex.quote(workdir or "/") cmd = ( f"cd {wd_q} 2>/dev/null || cd /; " f"find . -maxdepth {max_depth} -type f \\( -name 'main.*' -o -name 'app.*' -o -name 'index.*' \\) " f"2>/dev/null | head -n {limit}" ) exit_code, out = exec_shell(container, cmd) if exit_code != 0: return [] return [line.strip() for line in out.splitlines() if line.strip()] def _guess_language_from_code_files(code_paths: List[str]) -> Optional[str]: for p in code_paths: pl = p.lower() if pl.endswith(".py"): return "python" if pl.endswith((".js", ".mjs", ".cjs")): return "nodejs" if pl.endswith(".go"): return "go" if pl.endswith(".rs"): return "rust" if pl.endswith((".java", ".kt")): return "java" if pl.endswith((".cs", ".fs")): return ".net" return None def scan_running_containers(*, progress_enabled: bool = True) -> List[Dict[str, Any]]: client = docker.from_env() result: List[Dict[str, Any]] = [] containers = client.containers.list() it = progress(containers, total=len(containers), desc="Сканирование контейнеров") if progress_enabled else iter(containers) for c in it: info = c.attrs image = info["Config"]["Image"] created = info["Created"] ports = info["NetworkSettings"]["Ports"] mounts = info.get("Mounts", []) status = c.status labels_container = info["Config"].get("Labels", {}) or {} image_cfg = c.image.attrs.get("Config", {}) or {} image_labels = image_cfg.get("Labels", {}) or {} all_labels = {**image_labels, **labels_container} source_url, source_revision, image_version_label = _get_source_labels(all_labels) tags = c.image.tags tag_version = tags[0].split(":", 1)[-1] if tags else None workdir = info["Config"].get("WorkingDir") or "/" dep_paths = find_manifest_paths_in_container(c, workdir) language = _guess_language_from_manifests(dep_paths) code_paths = _find_code_files(c, workdir) if language is None: language = _guess_language_from_code_files(code_paths) container_data: Dict[str, Any] = { "name": c.name, "image": image, "id": c.id[:12], "status": status, "create_time": created, "ports": ports, "mounted_data": mounts, "version": tag_version, "image_version_label": image_version_label, "labels": labels_container, "all_labels": all_labels, "source_url": source_url, "source_revision": source_revision, "language": language, "dep_files": dep_paths, "code_files": code_paths, } deps_source = "none" deps_info: Dict[str, Any] = {"manifests": [], "dependencies": [], "errors": []} if dep_paths: deps_source = "container" deps_info = extract_deps_from_container(c, workdir, dep_paths) elif source_url: deps_source = "git" deps_info = extract_deps_from_repo(source_url, source_revision) container_data["deps_source"] = deps_source container_data["dep_manifests_used"] = deps_info.get("manifests", []) container_data["dependencies"] = deps_info.get("dependencies", []) container_data["dep_errors"] = deps_info.get("errors", []) result.append(container_data) return result