Source code for jamb.storage.validation

"""Validation module for jamb's native storage layer."""

import logging
from dataclasses import dataclass
from typing import Literal

from jamb.core.models import TraceabilityGraph
from jamb.storage.document_dag import DocumentDAG
from jamb.storage.items import compute_content_hash, read_item

logger = logging.getLogger("jamb")


[docs] @dataclass class ValidationIssue: """A single validation issue. Attributes: level (str): Severity — ``"error"``, ``"warning"``, or ``"info"``. uid (str | None): UID of the item involved, or ``None`` for document-level issues. prefix (str | None): Document prefix involved, or ``None`` when not applicable. message (str): Human-readable description of the issue. """ level: Literal["error", "warning", "info"] uid: str | None prefix: str | None message: str def __str__(self) -> str: """Return a human-readable representation of the validation issue.""" parts = [f"[{self.level.upper()}]"] if self.uid and self.prefix: parts.append(f"{self.prefix}:{self.uid}") elif self.uid: parts.append(self.uid) elif self.prefix: parts.append(self.prefix) parts.append(self.message) return " ".join(parts)
[docs] def validate( dag: DocumentDAG, graph: TraceabilityGraph, *, check_links: bool = True, check_suspect: bool = True, check_review: bool = True, check_children: bool = True, check_empty_docs: bool = True, check_empty_text: bool = True, check_self_links: bool = True, check_item_cycles: bool = True, check_unlinked: bool = True, skip_prefixes: list[str] | None = None, ) -> list[ValidationIssue]: """Run validation checks on the document tree. Args: dag: The document DAG. graph: The traceability graph with items. check_links: Check link validity and conformance. check_suspect: Check for suspect links (hash mismatch). check_review: Check review status. check_children: Check that non-leaf docs have children linking to them. check_empty_docs: Check for documents with no items. check_empty_text: Check for items with empty text. check_self_links: Check for items linking to themselves. check_item_cycles: Check for cycles in item-to-item links. check_unlinked: Check for unlinked normative items in child documents. skip_prefixes: Document prefixes to skip during validation. Returns: List of ValidationIssue objects. """ issues: list[ValidationIssue] = [] skip = set(skip_prefixes or []) # 1. DAG acyclicity cycle_errors = dag.validate_acyclic() for error in cycle_errors: issues.append(ValidationIssue("error", None, None, error)) # 2. Link validity and conformance if check_links: issues.extend(_check_links(dag, graph, skip, check_self_links)) # 3. Suspect link detection if check_suspect: issues.extend(_check_suspect_links(dag, graph, skip)) # 4. Review status if check_review: issues.extend(_check_review_status(graph, skip)) # 5. Child link check if check_children: issues.extend(_check_children(dag, graph, skip)) # 6. Empty documents if check_empty_docs: issues.extend(_check_empty_documents(dag, graph, skip)) # 7. Empty text if check_empty_text: issues.extend(_check_empty_text(graph, skip)) # 8. Item link cycles if check_item_cycles: issues.extend(_check_item_link_cycles(graph, skip)) # 9. Unlinked normative items in child docs if check_unlinked: issues.extend(_check_unlinked_items(dag, graph, skip)) return issues
def _check_links( dag: DocumentDAG, graph: TraceabilityGraph, skip: set[str], check_self_links: bool = True, ) -> list[ValidationIssue]: """Check link validity and conformance. Validates every link on every active, non-skipped item in the traceability graph. The following conditions are flagged: * Self-links (item links to its own UID). * Links to non-existent items. * Links to inactive items. * Links from non-normative items (items that have links but are not of type ``requirement``). * Links to non-normative target items. * Links that violate parent-document conformance (the target item belongs to a document that is not a parent of the source item's document in the DAG). Args: dag: The document DAG used to determine parent-child document relationships. graph: The traceability graph containing all items and their links. skip: Set of document prefixes to exclude from validation. check_self_links: Whether to flag items that link to themselves. Returns: A list of ``ValidationIssue`` objects, one per detected problem. Self-links, links to inactive items, and links to non-existent items are reported as errors or warnings depending on severity. """ issues = [] for uid, item in graph.items.items(): if item.document_prefix in skip: continue if not item.active: continue parents = dag.get_parents(item.document_prefix) if item.document_prefix in dag.documents else [] # Check non-normative item has links if item.type != "requirement" and item.links: issues.append( ValidationIssue( "warning", uid, item.document_prefix, "non-normative item has links", ) ) for link in item.links: # Check self-link if check_self_links and link == uid: issues.append( ValidationIssue( "warning", uid, item.document_prefix, "links to itself", ) ) continue # Check link target exists if link not in graph.items: issues.append( ValidationIssue( "error", uid, item.document_prefix, f"links to non-existent item: {link}", ) ) continue # Check link to inactive item target = graph.items[link] if not target.active: issues.append( ValidationIssue( "error", uid, item.document_prefix, f"links to inactive item: {link}", ) ) continue # Check link to non-normative item if target.type != "requirement": issues.append( ValidationIssue( "warning", uid, item.document_prefix, f"links to non-normative item: {link}", ) ) # Check link conformance (links to parent document) if parents: if target.document_prefix not in parents: issues.append( ValidationIssue( "warning", uid, item.document_prefix, f"links to {link} in document {target.document_prefix}, " f"which is not a parent document " f"(expected: {', '.join(parents)})", ) ) return issues def _check_suspect_links(dag: DocumentDAG, graph: TraceabilityGraph, skip: set[str]) -> list[ValidationIssue]: """Check for suspect links by comparing stored hashes to current content. A link is considered *suspect* when the content hash stored at the time the link was created no longer matches the current content hash of the target item. This indicates that the target item has been modified since the link was last verified. Links that have no stored hash at all are also flagged, since they cannot be verified. For each active, non-skipped item the function reads the raw YAML file to obtain ``link_hashes``, recomputes the content hash of every linked target, and compares the two values. Args: dag: The document DAG, used to resolve file paths for raw item data. graph: The traceability graph containing all items and their links. skip: Set of document prefixes to exclude from validation. Returns: A list of ``ValidationIssue`` objects with level ``warning`` for each suspect link (hash mismatch) and each link missing a stored hash. """ issues = [] for uid, item in graph.items.items(): if item.document_prefix in skip: continue if not item.active: continue # Read raw item to get link hashes doc_path = dag.document_paths.get(item.document_prefix) if doc_path is None: logger.warning("Document path not found for prefix: %s", item.document_prefix) continue item_path = doc_path / f"{uid}.yml" if not item_path.exists(): logger.warning("Item file not found: %s", item_path) continue raw_item = read_item(item_path, item.document_prefix) link_hashes = raw_item.get("link_hashes", {}) for link_uid, stored_hash in link_hashes.items(): if link_uid not in graph.items: continue if not graph.items[link_uid].active: continue # Compute current hash of linked item target = graph.items[link_uid] target_data = { "text": target.text, "header": target.header, "links": target.links, "type": target.type, } current_hash = compute_content_hash(target_data) if stored_hash != current_hash: issues.append( ValidationIssue( "warning", uid, item.document_prefix, f"suspect link to {link_uid} (content may have changed; run 'jamb review clear' to re-verify)", ) ) # Check for links with no stored hash for link_uid in item.links: if link_uid in link_hashes: continue # already checked above if link_uid not in graph.items: continue # broken link, caught by _check_links if not graph.items[link_uid].active: continue issues.append( ValidationIssue( "warning", uid, item.document_prefix, f"link to {link_uid} has no stored hash (run 'jamb review clear' to verify links)", ) ) return issues def _check_review_status(graph: TraceabilityGraph, skip: set[str]) -> list[ValidationIssue]: """Check that items have been reviewed and review hash matches current content. Ensures every active normative (``requirement``) item has been reviewed. Two conditions are flagged: * The item has never been reviewed (``reviewed`` field is falsy). * The item has been modified since its last review, detected by comparing the stored review hash against a freshly computed content hash. Args: graph: The traceability graph containing all items. skip: Set of document prefixes to exclude from validation. Returns: A list of ``ValidationIssue`` objects with level ``warning`` for each unreviewed or stale-reviewed item. """ issues = [] for uid, item in graph.items.items(): if item.document_prefix in skip: continue if not item.active: continue if item.type != "requirement": continue if not item.reviewed: issues.append( ValidationIssue( "warning", uid, item.document_prefix, "has not been reviewed (run 'jamb review mark' to mark as reviewed)", ) ) else: item_data = { "text": item.text, "header": item.header, "links": item.links, "type": item.type, } current_hash = compute_content_hash(item_data) if item.reviewed != current_hash: issues.append( ValidationIssue( "warning", uid, item.document_prefix, "has been modified since last review (run 'jamb review mark' to re-approve)", ) ) return issues def _check_children(dag: DocumentDAG, graph: TraceabilityGraph, skip: set[str]) -> list[ValidationIssue]: """Check that non-leaf document items have children linking to them. For every active normative item that belongs to a non-leaf document (i.e., a document that has child documents in the DAG), verifies that at least one active item in the graph links to it. Items in leaf documents are excluded because they have no child documents from which links would originate. Args: dag: The document DAG, used to identify leaf documents. graph: The traceability graph containing all items and their links. skip: Set of document prefixes to exclude from validation. Returns: A list of ``ValidationIssue`` objects with level ``warning`` for each non-leaf-document item that has no children linking to it. """ issues = [] leaf_docs = set(dag.get_leaf_documents()) # Build set of UIDs that are linked to (only from active items) linked_to: set[str] = set() for item in graph.items.values(): if not item.active: continue for link in item.links: linked_to.add(link) for uid, item in graph.items.items(): if item.document_prefix in skip: continue if not item.active: continue if item.type != "requirement": continue if item.document_prefix in leaf_docs: continue # Check if any child document item links to this item if uid not in linked_to: issues.append( ValidationIssue( "warning", uid, item.document_prefix, "has no children linking to it from child documents", ) ) return issues def _check_empty_documents(dag: DocumentDAG, graph: TraceabilityGraph, skip: set[str]) -> list[ValidationIssue]: """Check for documents that contain no items. Iterates over every document registered in the DAG and flags those that have zero items in the traceability graph. Empty documents may indicate a misconfiguration or an incomplete import. Args: dag: The document DAG, providing the set of known document prefixes. graph: The traceability graph containing all items. skip: Set of document prefixes to exclude from validation. Returns: A list of ``ValidationIssue`` objects with level ``warning`` for each document that contains no items. """ issues = [] prefixes_with_items = {item.document_prefix for item in graph.items.values()} for prefix in dag.documents: if prefix in skip: continue if prefix not in prefixes_with_items: issues.append( ValidationIssue( "warning", None, prefix, "document contains no items", ) ) return issues def _check_empty_text(graph: TraceabilityGraph, skip: set[str]) -> list[ValidationIssue]: """Check for items with empty or whitespace-only text. Flags every active item whose ``text`` field is empty or contains only whitespace characters. Such items are unlikely to be intentional and may indicate incomplete authoring. Args: graph: The traceability graph containing all items. skip: Set of document prefixes to exclude from validation. Returns: A list of ``ValidationIssue`` objects with level ``warning`` for each item that has empty text. """ issues = [] for uid, item in graph.items.items(): if item.document_prefix in skip: continue if not item.active: continue if not item.text or not item.text.strip(): issues.append( ValidationIssue( "warning", uid, item.document_prefix, "has empty text", ) ) return issues def _check_item_link_cycles(graph: TraceabilityGraph, skip: set[str]) -> list[ValidationIssue]: """Detect cycles in the item-to-item link graph using DFS. Builds a directed graph where each active, non-skipped item is a node and each link from one item to another is an edge. A depth-first search with three-color marking (white/gray/black) is used to detect back edges, which indicate cycles. Each unique cycle (identified by its set of member UIDs) is reported at most once. The issue message includes the full cycle path. Args: graph: The traceability graph containing all items and their links. skip: Set of document prefixes to exclude from validation. Returns: A list of ``ValidationIssue`` objects with level ``error`` for each distinct cycle found in the item link graph. """ issues: list[ValidationIssue] = [] reported_cycles: set[frozenset[str]] = set() # Build adjacency: item -> items it links to (only active, non-skipped) active_uids = {uid for uid, item in graph.items.items() if item.active and item.document_prefix not in skip} adjacency: dict[str, list[str]] = {} for uid in active_uids: adjacency[uid] = [lk for lk in graph.items[uid].links if lk in active_uids] WHITE, GRAY, BLACK = 0, 1, 2 color: dict[str, int] = {uid: WHITE for uid in active_uids} path: list[str] = [] for start_uid in active_uids: if color[start_uid] != WHITE: continue # Stack of (uid, iterator_over_links) stack: list[tuple[str, int]] = [(start_uid, 0)] color[start_uid] = GRAY path.append(start_uid) while stack: uid, link_idx = stack[-1] active_links = adjacency[uid] if link_idx < len(active_links): # Advance the index for the current frame stack[-1] = (uid, link_idx + 1) link = active_links[link_idx] if color[link] == GRAY: # Found a cycle — extract it cycle_start = path.index(link) cycle_members = frozenset(path[cycle_start:]) if cycle_members not in reported_cycles: reported_cycles.add(cycle_members) cycle_uids = path[cycle_start:] # Report all UIDs involved in the cycle for clarity affected_uids = ", ".join(sorted(cycle_members)) issues.append( ValidationIssue( "error", link, graph.items[link].document_prefix, f"cycle in item links: {' -> '.join(cycle_uids)} -> {link} (affects: {affected_uids})", ) ) elif color[link] == WHITE: color[link] = GRAY path.append(link) stack.append((link, 0)) else: # All links processed, backtrack stack.pop() path.pop() color[uid] = BLACK return issues def _check_unlinked_items(dag: DocumentDAG, graph: TraceabilityGraph, skip: set[str]) -> list[ValidationIssue]: """Check for normative non-derived items in child documents with no links. In a well-formed traceability tree, every normative item in a child document should link upward to at least one item in a parent document (unless it is explicitly marked as derived). This check flags active normative items that belong to a document with parent documents in the DAG yet have an empty links list. Derived items are excluded because they intentionally lack upward links. Items in root documents (no parents) are also excluded. Args: dag: The document DAG, used to determine whether an item's document has parent documents. graph: The traceability graph containing all items and their links. skip: Set of document prefixes to exclude from validation. Returns: A list of ``ValidationIssue`` objects with level ``warning`` for each unlinked normative non-derived item in a child document. """ issues = [] for uid, item in graph.items.items(): if item.document_prefix in skip: continue if not item.active: continue if item.type != "requirement": continue if item.derived: continue # Check if this document has parents (i.e., it's a child document) parents = dag.get_parents(item.document_prefix) if item.document_prefix in dag.documents else [] if not parents: continue if not item.links: issues.append( ValidationIssue( "warning", uid, item.document_prefix, "normative non-derived item has no links to parent document" " (add links or set 'derived: true' to suppress)", ) ) return issues