178 lines
6.2 KiB
Python
178 lines
6.2 KiB
Python
"""Configuration validation engine"""
|
|
from typing import List, Dict, Any, Tuple
|
|
import ipaddress
|
|
|
|
|
|
class ValidationError:
|
|
"""Single validation error/warning"""
|
|
def __init__(self, message: str, severity: str = "error"):
|
|
self.message = message
|
|
self.severity = severity # "error" or "warning"
|
|
|
|
def to_dict(self) -> Dict[str, str]:
|
|
return {"message": self.message, "severity": self.severity}
|
|
|
|
|
|
class ConfigValidator:
|
|
"""
|
|
Validates Cisco configuration for common issues.
|
|
|
|
Design:
|
|
- Separate validation rules by feature
|
|
- Return errors AND warnings
|
|
- Idempotent (same config always produces same validation)
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.errors: List[ValidationError] = []
|
|
self.warnings: List[ValidationError] = []
|
|
|
|
def validate(self, config_data: Dict[str, Any]) -> Tuple[List[Dict], List[Dict]]:
|
|
"""
|
|
Validate entire configuration.
|
|
|
|
Returns:
|
|
(errors, warnings) - each is list of dicts with "message" key
|
|
"""
|
|
self.errors = []
|
|
self.warnings = []
|
|
|
|
# Run all validators
|
|
self._validate_vlans(config_data)
|
|
self._validate_interfaces(config_data)
|
|
self._validate_routing(config_data)
|
|
self._validate_nat(config_data)
|
|
self._validate_acls(config_data)
|
|
|
|
errors_list = [e.to_dict() for e in self.errors]
|
|
warnings_list = [w.to_dict() for w in self.warnings]
|
|
|
|
return errors_list, warnings_list
|
|
|
|
def _validate_vlans(self, config_data: Dict[str, Any]):
|
|
"""Validate VLAN configuration"""
|
|
vlans = config_data.get("vlans", [])
|
|
vlan_ids = set()
|
|
|
|
for vlan in vlans:
|
|
vlan_id = vlan.get("id")
|
|
|
|
# Check valid VLAN range (1-4094)
|
|
if not (1 <= vlan_id <= 4094):
|
|
self.errors.append(
|
|
ValidationError(f"VLAN {vlan_id} outside valid range (1-4094)")
|
|
)
|
|
|
|
# Check duplicates
|
|
if vlan_id in vlan_ids:
|
|
self.errors.append(
|
|
ValidationError(f"Duplicate VLAN {vlan_id}")
|
|
)
|
|
vlan_ids.add(vlan_id)
|
|
|
|
# Check name
|
|
name = vlan.get("name", "")
|
|
if len(name) > 32:
|
|
self.warnings.append(
|
|
ValidationError(f"VLAN {vlan_id} name too long (max 32 chars)", "warning")
|
|
)
|
|
|
|
def _validate_interfaces(self, config_data: Dict[str, Any]):
|
|
"""Validate interface configuration"""
|
|
interfaces = config_data.get("interfaces", [])
|
|
vlans = {v.get("id") for v in config_data.get("vlans", [])}
|
|
ip_addresses = set()
|
|
|
|
for iface in interfaces:
|
|
iface_name = iface.get("name")
|
|
iface_type = iface.get("type", "access")
|
|
|
|
# Validate VLAN assignment
|
|
if iface_type == "access":
|
|
vlan = iface.get("vlan")
|
|
if vlan and vlan not in vlans:
|
|
self.errors.append(
|
|
ValidationError(f"Interface {iface_name} references undefined VLAN {vlan}")
|
|
)
|
|
|
|
# Validate trunk VLAN config
|
|
elif iface_type == "trunk":
|
|
trunk_vlans = iface.get("trunk_vlans", [])
|
|
for vlan in trunk_vlans:
|
|
if vlan not in vlans:
|
|
self.warnings.append(
|
|
ValidationError(
|
|
f"Interface {iface_name} trunk includes undefined VLAN {vlan}",
|
|
"warning"
|
|
)
|
|
)
|
|
|
|
# Validate IP address
|
|
ip_addr = iface.get("ip_address")
|
|
if ip_addr:
|
|
try:
|
|
# Parse CIDR notation
|
|
network = ipaddress.ip_network(ip_addr, strict=False)
|
|
|
|
# Check for overlaps
|
|
for existing_ip in ip_addresses:
|
|
existing_network = ipaddress.ip_network(existing_ip, strict=False)
|
|
if network.overlaps(existing_network):
|
|
self.errors.append(
|
|
ValidationError(f"IP {ip_addr} overlaps with {existing_ip}")
|
|
)
|
|
|
|
ip_addresses.add(ip_addr)
|
|
except ValueError:
|
|
self.errors.append(
|
|
ValidationError(f"Invalid IP address: {ip_addr}")
|
|
)
|
|
|
|
def _validate_routing(self, config_data: Dict[str, Any]):
|
|
"""Validate static routes"""
|
|
routes = config_data.get("routes", [])
|
|
|
|
for route in routes:
|
|
destination = route.get("destination")
|
|
gateway = route.get("gateway")
|
|
|
|
try:
|
|
ipaddress.ip_network(destination, strict=False)
|
|
except ValueError:
|
|
self.errors.append(
|
|
ValidationError(f"Invalid route destination: {destination}")
|
|
)
|
|
|
|
try:
|
|
ipaddress.ip_address(gateway)
|
|
except ValueError:
|
|
self.errors.append(
|
|
ValidationError(f"Invalid gateway IP: {gateway}")
|
|
)
|
|
|
|
def _validate_nat(self, config_data: Dict[str, Any]):
|
|
"""Validate NAT configuration"""
|
|
nat = config_data.get("nat")
|
|
if not nat:
|
|
return
|
|
|
|
inside_iface = nat.get("inside_interface")
|
|
outside_iface = nat.get("outside_interface")
|
|
|
|
if not inside_iface or not outside_iface:
|
|
self.errors.append(
|
|
ValidationError("NAT requires both inside and outside interfaces")
|
|
)
|
|
|
|
def _validate_acls(self, config_data: Dict[str, Any]):
|
|
"""Validate ACL configuration"""
|
|
acls = config_data.get("acls", [])
|
|
|
|
for acl in acls:
|
|
rules = acl.get("rules", [])
|
|
|
|
if not rules:
|
|
self.warnings.append(
|
|
ValidationError("ACL with no rules", "warning")
|
|
)
|