"""Save and load coverage data for decoupled matrix generation."""
import json
import os
import shutil
import tempfile
import warnings
from pathlib import Path
from typing import Any
from jamb.core.models import (
Item,
ItemCoverage,
LinkedTest,
MatrixMetadata,
TestEnvironment,
TraceabilityGraph,
)
COVERAGE_FILE = ".jamb"
# Supported coverage file versions for forward compatibility
CURRENT_VERSION = 2
SUPPORTED_VERSIONS = {1, 2}
# Required top-level fields for file validation
REQUIRED_FIELDS = {"coverage", "graph"}
[docs]
def save_coverage(
coverage: dict[str, ItemCoverage],
graph: TraceabilityGraph,
output_path: str = COVERAGE_FILE,
metadata: MatrixMetadata | None = None,
manual_tc_ids: dict[str, str] | None = None,
) -> None:
"""Save coverage data to .jamb file for later matrix generation.
Args:
coverage: Coverage data mapping UIDs to ItemCoverage.
graph: The traceability graph with all items and relationships.
output_path: Path to write the coverage file (default: .jamb).
metadata: Optional matrix metadata for IEC 62304 compliance.
manual_tc_ids: Optional dict mapping nodeid to manual TC ID.
"""
data: dict[str, Any] = {
"version": CURRENT_VERSION,
"coverage": {},
"graph": {
"items": {},
"item_parents": graph.item_parents,
"item_children": graph.item_children,
"document_parents": graph.document_parents,
},
}
# Save manual TC IDs if provided
if manual_tc_ids:
data["manual_tc_ids"] = manual_tc_ids
# Serialize coverage
for uid, cov in coverage.items():
data["coverage"][uid] = {
"item": _serialize_item(cov.item),
"linked_tests": [_serialize_linked_test(lt) for lt in cov.linked_tests],
}
# Serialize all graph items (not just coverage items)
for uid, item in graph.items.items():
data["graph"]["items"][uid] = _serialize_item(item)
# Serialize metadata if provided
if metadata:
data["metadata"] = _serialize_metadata(metadata)
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
# Use atomic write pattern: write to temp file, then rename
# This prevents file corruption if the process is interrupted
try:
with tempfile.NamedTemporaryFile(
mode="w",
dir=path.parent,
delete=False,
suffix=".tmp",
encoding="utf-8",
) as f:
f.write(json.dumps(data, indent=2))
f.flush()
os.fsync(f.fileno())
temp_path = f.name
shutil.move(temp_path, path) # Atomic on POSIX
except OSError as atomic_err:
# Fallback to direct write if atomic write fails
try:
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
except OSError as fallback_err:
raise OSError(
f"Failed to write coverage file to {path}. "
f"Atomic write failed: {atomic_err}. "
f"Fallback write also failed: {fallback_err}"
) from fallback_err
[docs]
def load_coverage(
input_path: str = COVERAGE_FILE,
) -> tuple[dict[str, ItemCoverage], TraceabilityGraph, MatrixMetadata | None, dict[str, str]]:
"""Load coverage data from .jamb file.
Args:
input_path: Path to the coverage file (default: .jamb).
Returns:
Tuple of (coverage dict, TraceabilityGraph, optional MatrixMetadata, manual_tc_ids).
manual_tc_ids maps test nodeid to manual TC ID (empty dict if none).
Raises:
FileNotFoundError: If the coverage file does not exist.
ValueError: If the coverage file is invalid or unsupported version.
"""
path = Path(input_path)
if not path.exists():
raise FileNotFoundError(f"Coverage file not found: {input_path}")
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in coverage file: {e}") from e
# Validate version with forward compatibility check
version = data.get("version", 0)
if version not in SUPPORTED_VERSIONS:
raise ValueError(
f"Unsupported .jamb file version {version}. "
f"Supported versions: {sorted(SUPPORTED_VERSIONS)}. "
"Regenerate with 'pytest --jamb'."
)
# Validate required fields
missing = REQUIRED_FIELDS - set(data.keys())
if missing:
raise ValueError(f"Corrupt .jamb file, missing required fields: {sorted(missing)}")
# Deserialize graph
graph = TraceabilityGraph()
graph_data = data.get("graph", {})
# Restore document parents first
for prefix, parents in graph_data.get("document_parents", {}).items():
graph.set_document_parents(prefix, parents)
# Restore items
for _uid, item_data in graph_data.get("items", {}).items():
item = _deserialize_item(item_data)
graph.add_item(item)
# Deserialize coverage
coverage: dict[str, ItemCoverage] = {}
orphaned_uids: list[str] = []
for uid, cov_data in data.get("coverage", {}).items():
# Validate that coverage item UID exists in graph
if uid not in graph.items:
orphaned_uids.append(uid)
# Validate item data has required field
if "item" not in cov_data:
warnings.warn(
f"Malformed coverage entry for '{uid}': missing 'item' field",
stacklevel=2,
)
continue
item = _deserialize_item(cov_data["item"])
linked_tests = [_deserialize_linked_test(lt) for lt in cov_data.get("linked_tests", [])]
coverage[uid] = ItemCoverage(item=item, linked_tests=linked_tests)
# Warn about orphaned items (in coverage but not in graph)
if orphaned_uids:
preview = orphaned_uids[:5]
suffix = f" and {len(orphaned_uids) - 5} more" if len(orphaned_uids) > 5 else ""
warnings.warn(
f"Orphaned items in coverage not found in graph: {preview}{suffix}",
stacklevel=2,
)
# Deserialize metadata if present
metadata = None
if "metadata" in data:
metadata = _deserialize_metadata(data["metadata"])
# Load manual TC IDs (version 2+)
manual_tc_ids: dict[str, str] = data.get("manual_tc_ids", {})
return coverage, graph, metadata, manual_tc_ids
def _serialize_item(item: Item) -> dict[str, Any]:
"""Serialize an Item to a dictionary."""
return {
"uid": item.uid,
"text": item.text,
"document_prefix": item.document_prefix,
"active": item.active,
"type": item.type,
"header": item.header,
"level": item.level,
"links": item.links,
"reviewed": item.reviewed,
"derived": item.derived,
"testable": item.testable,
"custom_attributes": item.custom_attributes,
}
VALID_ITEM_TYPES = {"requirement", "info", "heading"}
def _deserialize_item(data: dict[str, Any]) -> Item:
"""Deserialize a dictionary to an Item.
Args:
data: Dictionary with item fields.
Returns:
Deserialized Item object.
Raises:
ValueError: If required fields (uid, text, document_prefix) are missing.
Note:
Invalid item types issue a warning and default to 'requirement'.
"""
# Validate required fields
for field in ("uid", "text", "document_prefix"):
if field not in data:
raise ValueError(f"Missing required field '{field}' in item data")
# Validate type field
item_type = data.get("type", "requirement")
if item_type not in VALID_ITEM_TYPES:
warnings.warn(
f"Invalid item type '{item_type}' for item '{data['uid']}', "
f"expected one of {sorted(VALID_ITEM_TYPES)}. Defaulting to 'requirement'.",
stacklevel=3,
)
item_type = "requirement"
return Item(
uid=data["uid"],
text=data["text"],
document_prefix=data["document_prefix"],
active=data.get("active", True),
type=item_type,
header=data.get("header"),
level=data.get("level"),
links=data.get("links", []),
reviewed=data.get("reviewed"),
derived=data.get("derived", False),
testable=data.get("testable", True),
custom_attributes=data.get("custom_attributes", {}),
)
def _serialize_linked_test(lt: LinkedTest) -> dict[str, Any]:
"""Serialize a LinkedTest to a dictionary."""
return {
"test_nodeid": lt.test_nodeid,
"item_uid": lt.item_uid,
"test_outcome": lt.test_outcome,
"notes": lt.notes,
"test_actions": lt.test_actions,
"expected_results": lt.expected_results,
"actual_results": lt.actual_results,
"execution_timestamp": lt.execution_timestamp,
}
def _validate_timestamp(ts: str | None) -> str | None:
"""Validate timestamp format (ISO 8601).
Args:
ts: Timestamp string to validate, or None.
Returns:
The timestamp if valid, or None if invalid.
"""
if ts is None:
return None
try:
from datetime import datetime
# Try to parse ISO format (handles both with and without timezone)
datetime.fromisoformat(ts.replace("Z", "+00:00"))
return ts
except (ValueError, AttributeError):
warnings.warn(f"Invalid timestamp format: {ts}", stacklevel=3)
return None
def _deserialize_linked_test(data: dict[str, Any]) -> LinkedTest:
"""Deserialize a dictionary to a LinkedTest."""
return LinkedTest(
test_nodeid=data["test_nodeid"],
item_uid=data["item_uid"],
test_outcome=data.get("test_outcome"),
notes=data.get("notes", []),
test_actions=data.get("test_actions", []),
expected_results=data.get("expected_results", []),
actual_results=data.get("actual_results", []),
execution_timestamp=_validate_timestamp(data.get("execution_timestamp")),
)
def _serialize_metadata(metadata: MatrixMetadata) -> dict[str, Any]:
"""Serialize MatrixMetadata to a dictionary."""
result: dict[str, Any] = {
"software_version": metadata.software_version,
"tester_id": metadata.tester_id,
"execution_timestamp": metadata.execution_timestamp,
}
if metadata.environment:
env = metadata.environment
result["environment"] = {
"os_name": env.os_name,
"os_version": env.os_version,
"python_version": env.python_version,
"platform": env.platform,
"processor": env.processor,
"hostname": env.hostname,
"cpu_count": env.cpu_count,
"test_tools": env.test_tools,
}
return result
def _deserialize_metadata(data: dict[str, Any]) -> MatrixMetadata:
"""Deserialize a dictionary to MatrixMetadata."""
environment = None
if "environment" in data and data["environment"]:
env_data = data["environment"]
environment = TestEnvironment(
os_name=env_data.get("os_name", ""),
os_version=env_data.get("os_version", ""),
python_version=env_data.get("python_version", ""),
platform=env_data.get("platform", ""),
processor=env_data.get("processor", ""),
hostname=env_data.get("hostname", ""),
cpu_count=env_data.get("cpu_count"),
test_tools=env_data.get("test_tools", {}),
)
return MatrixMetadata(
software_version=data.get("software_version"),
tester_id=data.get("tester_id", "Unknown"),
execution_timestamp=data.get("execution_timestamp"),
environment=environment,
)