diff --git a/nodescraper/plugins/inband/cmdline/analyzer_args.py b/nodescraper/plugins/inband/cmdline/analyzer_args.py index 88fd431d..e4a34422 100644 --- a/nodescraper/plugins/inband/cmdline/analyzer_args.py +++ b/nodescraper/plugins/inband/cmdline/analyzer_args.py @@ -23,21 +23,31 @@ # SOFTWARE. # ############################################################################### -from typing import Union +from typing import Dict, List, Optional, Tuple, Union -from pydantic import Field, field_validator +from pydantic import Field, field_validator, model_validator from nodescraper.models import AnalyzerArgs +from nodescraper.plugins.inband.cmdline.cmdlineconfig import ( + CmdlineConflictError, + CmdlineOverride, + ConflictType, + OverrideConfig, + ParameterValueConflict, + RequiredVsBannedConflict, +) from nodescraper.plugins.inband.cmdline.cmdlinedata import CmdlineDataModel class CmdlineAnalyzerArgs(AnalyzerArgs): - required_cmdline: Union[str, list] = Field(default_factory=list) - banned_cmdline: Union[str, list] = Field(default_factory=list) + required_cmdline: Union[str, List] = Field(default_factory=list) + banned_cmdline: Union[str, List] = Field(default_factory=list) + os_overrides: Dict[str, OverrideConfig] = Field(default_factory=dict) + platform_overrides: Dict[str, OverrideConfig] = Field(default_factory=dict) @field_validator("required_cmdline", mode="before") @classmethod - def validate_required_cmdline(cls, required_cmdline: Union[str, list]) -> list: + def validate_required_cmdline(cls, required_cmdline: Union[str, List]) -> List: """support str or list input for required_cmdline Args: @@ -53,7 +63,7 @@ def validate_required_cmdline(cls, required_cmdline: Union[str, list]) -> list: @field_validator("banned_cmdline", mode="before") @classmethod - def validate_banned_cmdline(cls, banned_cmdline: Union[str, list]) -> list: + def validate_banned_cmdline(cls, banned_cmdline: Union[str, List]) -> List: """support str or list input for banned_cmdline Args: @@ -67,6 +77,138 @@ def validate_banned_cmdline(cls, banned_cmdline: Union[str, list]) -> list: return banned_cmdline + @model_validator(mode="after") + def validate_no_conflicts(self) -> "CmdlineAnalyzerArgs": + """Validate configuration for conflicts that can be detected at config-time. + + Checks base configuration for conflicts. + Full validation with OS/platform context happens at runtime in get_effective_config(). + """ + # Check base configuration for conflicts + base_conflicts = set(self.required_cmdline) & set(self.banned_cmdline) + if base_conflicts: + raise CmdlineConflictError( + ConflictType.REQUIRED_VS_BANNED, + RequiredVsBannedConflict( + conflicting_parameters=list(base_conflicts), source="base configuration" + ), + ) + + # Check for conflicting parameter values in base configuration + self._check_parameter_value_conflicts(self.required_cmdline, "base configuration") # type: ignore[arg-type] + + # Validate each override configuration independently + # We can't validate cross-override conflicts here because we don't know + # which overrides will actually be applied (depends on runtime OS/platform) + for os_name, override in self.os_overrides.items(): + self._validate_override(override, f"os_override: {os_name}") + + for platform_name, override in self.platform_overrides.items(): + self._validate_override(override, f"platform_override: {platform_name}") + + return self + + def _validate_override(self, override: OverrideConfig, source: str) -> None: + """Validate a single override configuration. + + CONFIG-TIME VALIDATION: Checks for conflicts within a single override. + Cannot check conflicts between overrides as we don't know which will apply. + """ + # Check if any parameters are both added to required and banned + required_adds = set(override.required_cmdline.add) + banned_adds = set(override.banned_cmdline.add) + + conflicts = required_adds & banned_adds + if conflicts: + raise CmdlineConflictError( + ConflictType.REQUIRED_VS_BANNED, + RequiredVsBannedConflict(conflicting_parameters=list(conflicts), source=source), + ) + + def _check_parameter_value_conflicts(self, params: List[str], source: str) -> None: + """Check for conflicting parameter values (e.g., pci=bfsort vs pci=noats).""" + param_values: Dict[str, str] = {} + + for param in params: + if "=" in param: + key, value = param.split("=", 1) + if key in param_values and param_values[key] != value: + raise CmdlineConflictError( + ConflictType.PARAMETER_VALUE_CONFLICT, + ParameterValueConflict( + parameter=key, + conflicting_values=[f"{key}={param_values[key]}", f"{key}={value}"], + source=source, + ), + ) + param_values[key] = value + + def _apply_override(self, base_list: List[str], override: CmdlineOverride) -> List[str]: + """Apply add/remove operations from override configuration.""" + result = base_list.copy() + + # Process removes first + for item in override.remove: + if item in result: + result.remove(item) + + # Then process adds + for item in override.add: + if item not in result: + result.append(item) + + return result + + def get_effective_config( + self, os_id: Optional[str] = None, platform: Optional[str] = None + ) -> Tuple[List[str], List[str]]: + """Get effective cmdline configuration based on OS and platform overrides. + + RUNTIME VALIDATION: Applies overrides and validates final configuration. + + Args: + os_id: Operating system identifier (e.g., 'ubuntu', 'rhel') + platform: Platform identifier (e.g., 'mi300x', 'mi250') + + Returns: + tuple: (effective_required, effective_banned) lists of cmdline arguments + + Raises: + CmdlineConflictError: If the final configuration has conflicts + """ + required = list(self.required_cmdline) + banned = list(self.banned_cmdline) + + # Apply OS overrides if os_id is provided and matches + if os_id and os_id in self.os_overrides: + os_override = self.os_overrides[os_id] + required = self._apply_override(required, os_override.required_cmdline) + banned = self._apply_override(banned, os_override.banned_cmdline) + + # Apply platform overrides if platform is provided and matches + if platform and platform in self.platform_overrides: + platform_override = self.platform_overrides[platform] + required = self._apply_override(required, platform_override.required_cmdline) + banned = self._apply_override(banned, platform_override.banned_cmdline) + + # RUNTIME VALIDATION: Check final configuration for conflicts + conflicts = set(required) & set(banned) + if conflicts: + raise CmdlineConflictError( + ConflictType.REQUIRED_VS_BANNED, + RequiredVsBannedConflict( + conflicting_parameters=list(conflicts), + source=f"final configuration for OS '{os_id}' and platform '{platform}'", + ), + ) + + # Check for parameter value conflicts in final configuration + self._check_parameter_value_conflicts( + required, f"final configuration for OS '{os_id}' and platform '{platform}'" + ) + + return required, banned + @classmethod def build_from_model(cls, datamodel: CmdlineDataModel) -> "CmdlineAnalyzerArgs": """build analyzer args from data model diff --git a/nodescraper/plugins/inband/cmdline/cmdline_analyzer.py b/nodescraper/plugins/inband/cmdline/cmdline_analyzer.py index 71bc70d5..f36fa950 100644 --- a/nodescraper/plugins/inband/cmdline/cmdline_analyzer.py +++ b/nodescraper/plugins/inband/cmdline/cmdline_analyzer.py @@ -23,13 +23,21 @@ # SOFTWARE. # ############################################################################### -from typing import Optional +from typing import List, Optional + +from pydantic import ValidationError from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus from nodescraper.interfaces import DataAnalyzer from nodescraper.models import TaskResult from .analyzer_args import CmdlineAnalyzerArgs +from .cmdlineconfig import ( + CmdlineConflictError, + ConflictType, + ParameterValueConflict, + RequiredVsBannedConflict, +) from .cmdlinedata import CmdlineDataModel @@ -38,7 +46,7 @@ class CmdlineAnalyzer(DataAnalyzer[CmdlineDataModel, CmdlineAnalyzerArgs]): DATA_MODEL = CmdlineDataModel - def _compare_cmdline(self, cmdline: str, required_cmdline: list, banned_cmdline: list) -> bool: + def _compare_cmdline(self, cmdline: str, required_cmdline: List, banned_cmdline: List) -> bool: """Compare the kernel cmdline against required and banned cmdline arguments. Args: @@ -92,9 +100,66 @@ def analyze_data( self.result.status = ExecutionStatus.NOT_RAN return self.result + # Get OS and platform info if available + os_id = getattr(self, "os_id", None) if hasattr(self, "os_id") else None + platform = ( + getattr(self.system_info, "platform", None) if hasattr(self, "system_info") else None + ) + + try: + # Get effective configuration based on OS and platform overrides + # This performs runtime validation with the actual OS/platform context + effective_required, effective_banned = args.get_effective_config(os_id, platform) + + except ValidationError as e: + # Pydantic validation failed - configuration is invalid + self.result.status = ExecutionStatus.ERROR + self.result.message = "Invalid CmdlineAnalyzer configuration" + + # Log all validation errors in a single event + self._log_event( + category=EventCategory.RUNTIME, + description="Pydantic validation errors on configuration", + data={"errors": e.errors(include_url=False)}, + priority=EventPriority.CRITICAL, + console_log=True, + ) + return self.result + + except CmdlineConflictError as e: + # Runtime validation failed (conflicts detected) + self.result.status = ExecutionStatus.ERROR + self.result.message = str(e) + + # Build conflict data from structured exception + conflict_data = { + "error_type": "configuration_conflict", + "conflict_type": e.conflict_type.value, + } + + # Add specific details based on conflict type + if e.conflict_type == ConflictType.REQUIRED_VS_BANNED: + assert isinstance(e.details, RequiredVsBannedConflict) + conflict_data["conflicting_parameters"] = e.details.conflicting_parameters + conflict_data["source"] = e.details.source + elif e.conflict_type == ConflictType.PARAMETER_VALUE_CONFLICT: + assert isinstance(e.details, ParameterValueConflict) + conflict_data["parameter"] = e.details.parameter + conflict_data["conflicting_values"] = e.details.conflicting_values + conflict_data["source"] = e.details.source + + self._log_event( + category=EventCategory.RUNTIME, + description="CmdlineAnalyzer configuration conflict detected", + priority=EventPriority.ERROR, + data=conflict_data, + console_log=True, + ) + return self.result + # check if any of the cmdline defined in the list match the actual kernel cmdline check, missing_required, found_banned = self._compare_cmdline( - data.cmdline, args.required_cmdline, args.banned_cmdline + data.cmdline, effective_required, effective_banned ) if check: diff --git a/nodescraper/plugins/inband/cmdline/cmdlineconfig.py b/nodescraper/plugins/inband/cmdline/cmdlineconfig.py new file mode 100644 index 00000000..a8b64744 --- /dev/null +++ b/nodescraper/plugins/inband/cmdline/cmdlineconfig.py @@ -0,0 +1,120 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### +"""Cmdline configuration models and validation classes.""" + +from enum import Enum +from typing import Any, List, Union + +from pydantic import BaseModel, field_validator + + +class ConflictType(Enum): + """Types of configuration conflicts that can occur.""" + + REQUIRED_VS_BANNED = "required_vs_banned" + PARAMETER_VALUE_CONFLICT = "parameter_value_conflict" + + +class RequiredVsBannedConflict(BaseModel): + """Details for a required vs banned conflict.""" + + conflicting_parameters: List[str] + source: str # e.g., "base configuration", "os_override: ubuntu", etc. + + +class ParameterValueConflict(BaseModel): + """Details for a parameter value conflict.""" + + parameter: str # e.g., "pci" + conflicting_values: List[str] # e.g., ["pci=bfsort", "pci=noats"] + source: str # e.g., "final configuration for OS 'centos' and platform 'grand-teton'" + + +class CmdlineConflictError(Exception): + """Exception raised when cmdline configuration has conflicts.""" + + def __init__( + self, + conflict_type: ConflictType, + details: Union[RequiredVsBannedConflict, ParameterValueConflict], + ): + """Initialize the conflict error. + + Args: + conflict_type: Type of conflict from ConflictType enum + details: Structured conflict details (type depends on conflict_type) + """ + self.conflict_type = conflict_type + self.details = details + super().__init__(self._format_message()) + + def _format_message(self) -> str: + """Format a human-readable error message based on conflict type.""" + if self.conflict_type == ConflictType.REQUIRED_VS_BANNED: + assert isinstance(self.details, RequiredVsBannedConflict) + return f"Parameters cannot be both required and banned in {self.details.source}: {self.details.conflicting_parameters}" + elif self.conflict_type == ConflictType.PARAMETER_VALUE_CONFLICT: + assert isinstance(self.details, ParameterValueConflict) + return f"Conflicting values for parameter '{self.details.parameter}' in {self.details.source}: {' vs '.join(self.details.conflicting_values)}" + else: + return f"Configuration conflict: {self.conflict_type.value}" + + +class CmdlineOverride(BaseModel): + """Model for cmdline override configuration. + + This model represents the add/remove operations for cmdline parameters. + Validation happens at config-time to ensure proper structure. + """ + + add: List[str] = [] + remove: List[str] = [] + + @field_validator("add", "remove", mode="before") + @classmethod + def ensure_list(cls, v: Any) -> List[str]: + """Ensure add/remove are always lists. + + CONFIG-TIME VALIDATION: Converts various input formats to lists. + This prevents runtime errors from malformed configuration. + """ + if isinstance(v, str): + return [v] + elif isinstance(v, dict) and not v: # Empty dict + return [] + elif v is None: + return [] + return v + + +class OverrideConfig(BaseModel): + """Model for OS/platform override configuration. + + Contains overrides for both required and banned cmdline parameters. + """ + + required_cmdline: CmdlineOverride = CmdlineOverride() + banned_cmdline: CmdlineOverride = CmdlineOverride()