Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 148 additions & 6 deletions nodescraper/plugins/inband/cmdline/analyzer_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,31 @@
# SOFTWARE.
#
###############################################################################
from typing import Union
from typing import Dict, List, Optional, Tuple, Union

from pydantic import Field, field_validator
from pydantic import Field, field_validator, model_validator

from nodescraper.models import AnalyzerArgs
from nodescraper.plugins.inband.cmdline.cmdlineconfig import (
CmdlineConflictError,
CmdlineOverride,
ConflictType,
OverrideConfig,
ParameterValueConflict,
RequiredVsBannedConflict,
)
from nodescraper.plugins.inband.cmdline.cmdlinedata import CmdlineDataModel


class CmdlineAnalyzerArgs(AnalyzerArgs):
required_cmdline: Union[str, list] = Field(default_factory=list)
banned_cmdline: Union[str, list] = Field(default_factory=list)
required_cmdline: Union[str, List] = Field(default_factory=list)
banned_cmdline: Union[str, List] = Field(default_factory=list)
os_overrides: Dict[str, OverrideConfig] = Field(default_factory=dict)
platform_overrides: Dict[str, OverrideConfig] = Field(default_factory=dict)

@field_validator("required_cmdline", mode="before")
@classmethod
def validate_required_cmdline(cls, required_cmdline: Union[str, list]) -> list:
def validate_required_cmdline(cls, required_cmdline: Union[str, List]) -> List:
"""support str or list input for required_cmdline

Args:
Expand All @@ -53,7 +63,7 @@ def validate_required_cmdline(cls, required_cmdline: Union[str, list]) -> list:

@field_validator("banned_cmdline", mode="before")
@classmethod
def validate_banned_cmdline(cls, banned_cmdline: Union[str, list]) -> list:
def validate_banned_cmdline(cls, banned_cmdline: Union[str, List]) -> List:
"""support str or list input for banned_cmdline

Args:
Expand All @@ -67,6 +77,138 @@ def validate_banned_cmdline(cls, banned_cmdline: Union[str, list]) -> list:

return banned_cmdline

@model_validator(mode="after")
def validate_no_conflicts(self) -> "CmdlineAnalyzerArgs":
"""Validate configuration for conflicts that can be detected at config-time.

Checks base configuration for conflicts.
Full validation with OS/platform context happens at runtime in get_effective_config().
"""
# Check base configuration for conflicts
base_conflicts = set(self.required_cmdline) & set(self.banned_cmdline)
if base_conflicts:
raise CmdlineConflictError(
ConflictType.REQUIRED_VS_BANNED,
RequiredVsBannedConflict(
conflicting_parameters=list(base_conflicts), source="base configuration"
),
)

# Check for conflicting parameter values in base configuration
self._check_parameter_value_conflicts(self.required_cmdline, "base configuration") # type: ignore[arg-type]

# Validate each override configuration independently
# We can't validate cross-override conflicts here because we don't know
# which overrides will actually be applied (depends on runtime OS/platform)
for os_name, override in self.os_overrides.items():
self._validate_override(override, f"os_override: {os_name}")

for platform_name, override in self.platform_overrides.items():
self._validate_override(override, f"platform_override: {platform_name}")

return self

def _validate_override(self, override: OverrideConfig, source: str) -> None:
"""Validate a single override configuration.

CONFIG-TIME VALIDATION: Checks for conflicts within a single override.
Cannot check conflicts between overrides as we don't know which will apply.
"""
# Check if any parameters are both added to required and banned
required_adds = set(override.required_cmdline.add)
banned_adds = set(override.banned_cmdline.add)

conflicts = required_adds & banned_adds
if conflicts:
raise CmdlineConflictError(
ConflictType.REQUIRED_VS_BANNED,
RequiredVsBannedConflict(conflicting_parameters=list(conflicts), source=source),
)

def _check_parameter_value_conflicts(self, params: List[str], source: str) -> None:
"""Check for conflicting parameter values (e.g., pci=bfsort vs pci=noats)."""
param_values: Dict[str, str] = {}

for param in params:
if "=" in param:
key, value = param.split("=", 1)
if key in param_values and param_values[key] != value:
raise CmdlineConflictError(
ConflictType.PARAMETER_VALUE_CONFLICT,
ParameterValueConflict(
parameter=key,
conflicting_values=[f"{key}={param_values[key]}", f"{key}={value}"],
source=source,
),
)
param_values[key] = value

def _apply_override(self, base_list: List[str], override: CmdlineOverride) -> List[str]:
"""Apply add/remove operations from override configuration."""
result = base_list.copy()

# Process removes first
for item in override.remove:
if item in result:
result.remove(item)

# Then process adds
for item in override.add:
if item not in result:
result.append(item)

return result

def get_effective_config(
self, os_id: Optional[str] = None, platform: Optional[str] = None
) -> Tuple[List[str], List[str]]:
"""Get effective cmdline configuration based on OS and platform overrides.

RUNTIME VALIDATION: Applies overrides and validates final configuration.

Args:
os_id: Operating system identifier (e.g., 'ubuntu', 'rhel')
platform: Platform identifier (e.g., 'mi300x', 'mi250')

Returns:
tuple: (effective_required, effective_banned) lists of cmdline arguments

Raises:
CmdlineConflictError: If the final configuration has conflicts
"""
required = list(self.required_cmdline)
banned = list(self.banned_cmdline)

# Apply OS overrides if os_id is provided and matches
if os_id and os_id in self.os_overrides:
os_override = self.os_overrides[os_id]
required = self._apply_override(required, os_override.required_cmdline)
banned = self._apply_override(banned, os_override.banned_cmdline)

# Apply platform overrides if platform is provided and matches
if platform and platform in self.platform_overrides:
platform_override = self.platform_overrides[platform]
required = self._apply_override(required, platform_override.required_cmdline)
banned = self._apply_override(banned, platform_override.banned_cmdline)

# RUNTIME VALIDATION: Check final configuration for conflicts
conflicts = set(required) & set(banned)
if conflicts:
raise CmdlineConflictError(
ConflictType.REQUIRED_VS_BANNED,
RequiredVsBannedConflict(
conflicting_parameters=list(conflicts),
source=f"final configuration for OS '{os_id}' and platform '{platform}'",
),
)

# Check for parameter value conflicts in final configuration
self._check_parameter_value_conflicts(
required, f"final configuration for OS '{os_id}' and platform '{platform}'"
)

return required, banned

@classmethod
def build_from_model(cls, datamodel: CmdlineDataModel) -> "CmdlineAnalyzerArgs":
"""build analyzer args from data model
Expand Down
71 changes: 68 additions & 3 deletions nodescraper/plugins/inband/cmdline/cmdline_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,21 @@
# SOFTWARE.
#
###############################################################################
from typing import Optional
from typing import List, Optional

from pydantic import ValidationError

from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus
from nodescraper.interfaces import DataAnalyzer
from nodescraper.models import TaskResult

from .analyzer_args import CmdlineAnalyzerArgs
from .cmdlineconfig import (
CmdlineConflictError,
ConflictType,
ParameterValueConflict,
RequiredVsBannedConflict,
)
from .cmdlinedata import CmdlineDataModel


Expand All @@ -38,7 +46,7 @@ class CmdlineAnalyzer(DataAnalyzer[CmdlineDataModel, CmdlineAnalyzerArgs]):

DATA_MODEL = CmdlineDataModel

def _compare_cmdline(self, cmdline: str, required_cmdline: list, banned_cmdline: list) -> bool:
def _compare_cmdline(self, cmdline: str, required_cmdline: List, banned_cmdline: List) -> bool:
"""Compare the kernel cmdline against required and banned cmdline arguments.

Args:
Expand Down Expand Up @@ -92,9 +100,66 @@ def analyze_data(
self.result.status = ExecutionStatus.NOT_RAN
return self.result

# Get OS and platform info if available
os_id = getattr(self, "os_id", None) if hasattr(self, "os_id") else None
platform = (
getattr(self.system_info, "platform", None) if hasattr(self, "system_info") else None
)

try:
# Get effective configuration based on OS and platform overrides
# This performs runtime validation with the actual OS/platform context
effective_required, effective_banned = args.get_effective_config(os_id, platform)

except ValidationError as e:
# Pydantic validation failed - configuration is invalid
self.result.status = ExecutionStatus.ERROR
self.result.message = "Invalid CmdlineAnalyzer configuration"

# Log all validation errors in a single event
self._log_event(
category=EventCategory.RUNTIME,
description="Pydantic validation errors on configuration",
data={"errors": e.errors(include_url=False)},
priority=EventPriority.CRITICAL,
console_log=True,
)
return self.result

except CmdlineConflictError as e:
# Runtime validation failed (conflicts detected)
self.result.status = ExecutionStatus.ERROR
self.result.message = str(e)

# Build conflict data from structured exception
conflict_data = {
"error_type": "configuration_conflict",
"conflict_type": e.conflict_type.value,
}

# Add specific details based on conflict type
if e.conflict_type == ConflictType.REQUIRED_VS_BANNED:
assert isinstance(e.details, RequiredVsBannedConflict)
conflict_data["conflicting_parameters"] = e.details.conflicting_parameters
conflict_data["source"] = e.details.source
elif e.conflict_type == ConflictType.PARAMETER_VALUE_CONFLICT:
assert isinstance(e.details, ParameterValueConflict)
conflict_data["parameter"] = e.details.parameter
conflict_data["conflicting_values"] = e.details.conflicting_values
conflict_data["source"] = e.details.source

self._log_event(
category=EventCategory.RUNTIME,
description="CmdlineAnalyzer configuration conflict detected",
priority=EventPriority.ERROR,
data=conflict_data,
console_log=True,
)
return self.result

# check if any of the cmdline defined in the list match the actual kernel cmdline
check, missing_required, found_banned = self._compare_cmdline(
data.cmdline, args.required_cmdline, args.banned_cmdline
data.cmdline, effective_required, effective_banned
)

if check:
Expand Down
Loading