Plugin Validation and Versioning

Plugin Validation and Versioning

The following files were used as context for generating this wiki page:

Purpose and Scope

This document explains the validation and versioning mechanisms that ensure plugin integrity, compatibility, and security in WSHawk's plugin system. It covers the PluginMetadata structure, semantic versioning requirements, compatibility checking algorithms, checksum-based integrity validation, and duplicate detection policies.

For information about creating plugins, see Creating Payload Plugins, Creating Detector Plugins, and Creating Protocol Plugins. For the overall plugin architecture, see Plugin Architecture Overview.


Plugin Metadata Structure

Every plugin must implement the get_metadata() method from PluginBase to return a PluginMetadata instance. This metadata is used for validation, versioning, compatibility checking, and tracking.

PluginMetadata Fields

The PluginMetadata dataclass defines the following fields:

| Field | Type | Required | Default | Description | |-------|------|----------|---------|-------------| | name | str | Yes | - | Unique plugin identifier | | version | str | Yes | - | Semantic version (e.g., "1.0.0") | | description | str | Yes | - | Human-readable description | | author | str | No | "Regaan" | Plugin author name | | requires | List[str] | No | None | Python package dependencies | | min_wshawk_version | str | No | "2.0.0" | Minimum WSHawk version required | | checksum | str | No | "" | SHA-256 integrity checksum |

The PluginMetadata class provides serialization methods:

  • to_dict(): Convert to dictionary for JSON export
  • from_dict(data): Instantiate from dictionary

Sources: wshawk/plugin_system.py:17-33

Metadata Implementation Example

from wshawk.plugin_system import PluginBase, PluginMetadata

class MyPlugin(PayloadPlugin):
    def get_metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="custom_sql",
            version="2.1.3",
            description="Advanced SQL injection payloads",
            author="Security Team",
            requires=["sqlparse>=0.4.0"],
            min_wshawk_version="2.0.0"
        )

Sources: wshawk/plugin_system.py:35-50, wshawk/plugin_system.py:443-465


Diagram: Plugin Validation Pipeline

graph TB
    subgraph "Plugin Discovery Phase"
        PluginDir["plugins/ directory"]
        ScanFiles["_scan_available_plugins()"]
        FileList["_available_plugins Dict[name->path]"]
        Checksum["Calculate SHA-256<br/>_plugin_checksums"]
    end
    
    subgraph "Lazy Loading Trigger"
        LoadRequest["get_payloads() or<br/>run_detectors() or<br/>load_all_plugins()"]
        LazyLoad["_load_plugin_lazy(plugin_name)"]
        LockCheck["Threading Lock<br/>Check _loaded_plugins"]
    end
    
    subgraph "Validation Phase"
        Import["importlib.import_module()"]
        GetMetadata["plugin.get_metadata()"]
        ValidatePlugin["_validate_plugin()"]
        CheckName["name and version<br/>not empty?"]
        CheckVersion["_is_valid_version()<br/>semver format?"]
        CheckCompat["_is_compatible_version()<br/>min_wshawk_version?"]
    end
    
    subgraph "Registration Phase"
        RegInternal["_register_plugin_internal()"]
        DupCheck["Check _payload_plugins<br/>_detector_plugins<br/>_protocol_plugins"]
        OverridePolicy["allow_override<br/>flag?"]
        RegisterSuccess["Add to registry<br/>Add to _plugin_metadata<br/>Add to _loaded_plugins"]
    end
    
    subgraph "Plugin Registries"
        PayloadReg["_payload_plugins:<br/>Dict[str, PayloadPlugin]"]
        DetectorReg["_detector_plugins:<br/>Dict[str, DetectorPlugin]"]
        ProtocolReg["_protocol_plugins:<br/>Dict[str, ProtocolPlugin]"]
    end
    
    PluginDir --> ScanFiles
    ScanFiles --> FileList
    ScanFiles --> Checksum
    
    LoadRequest --> LazyLoad
    LazyLoad --> LockCheck
    LockCheck -->|not loaded| Import
    LockCheck -->|already loaded| RegisterSuccess
    
    Import --> GetMetadata
    GetMetadata --> ValidatePlugin
    ValidatePlugin --> CheckName
    CheckName -->|fail| Reject["Validation Failed<br/>Log Error"]
    CheckName -->|pass| CheckVersion
    CheckVersion -->|fail| Reject
    CheckVersion -->|pass| CheckCompat
    CheckCompat -->|fail| Reject
    CheckCompat -->|pass| RegInternal
    
    RegInternal --> DupCheck
    DupCheck -->|duplicate found| OverridePolicy
    OverridePolicy -->|allow_override=False| Reject
    OverridePolicy -->|allow_override=True| RegisterSuccess
    DupCheck -->|no duplicate| RegisterSuccess
    
    RegisterSuccess --> PayloadReg
    RegisterSuccess --> DetectorReg
    RegisterSuccess --> ProtocolReg

Sources: wshawk/plugin_system.py:141-156, wshawk/plugin_system.py:158-201, wshawk/plugin_system.py:203-235, wshawk/plugin_system.py:263-313


Semantic Versioning Validation

WSHawk enforces strict semantic versioning (semver) for all plugins. Version strings must follow the MAJOR.MINOR.PATCH format (e.g., "2.1.3").

Version Validation Logic

The _is_valid_version() method validates version strings:

def _is_valid_version(self, version: str) -> bool:
    """Check if version string is valid (semver)"""
    try:
        parts = version.split('.')
        return len(parts) == 3 and all(p.isdigit() for p in parts)
    except (ValueError, AttributeError):
        return False

Validation Rules:

  • Must contain exactly 3 parts separated by dots
  • Each part must be a non-negative integer
  • No pre-release identifiers (e.g., "1.0.0-alpha") allowed
  • No build metadata (e.g., "1.0.0+build123") allowed

Invalid Examples:

  • "1.0" - Missing patch version
  • "1.0.0.1" - Too many parts
  • "1.0.x" - Non-numeric component
  • "v1.0.0" - Contains prefix
  • "1.0.0-beta" - Contains pre-release identifier

Sources: wshawk/plugin_system.py:237-243


Diagram: Version Compatibility Checking

graph TD
    Start["Plugin metadata:<br/>min_wshawk_version"]
    Current["Current WSHawk version:<br/>e.g., '3.0.0'"]
    
    Parse["Parse versions:<br/>req_parts = [int(p) for p in required.split('.')]<br/>cur_parts = [int(p) for p in current.split('.')]"]
    
    CheckMajor{"req_parts[0] ==<br/>cur_parts[0]?"}
    CheckMinor{"cur_parts[1] >=<br/>req_parts[1]?"}
    
    Compatible["Compatible:<br/>Return True"]
    Incompatible["Incompatible:<br/>Return False<br/>Plugin rejected"]
    
    Start --> Parse
    Current --> Parse
    Parse --> CheckMajor
    
    CheckMajor -->|"No<br/>(Major mismatch)"| Incompatible
    CheckMajor -->|"Yes<br/>(Major match)"| CheckMinor
    
    CheckMinor -->|"No<br/>(Minor too old)"| Incompatible
    CheckMinor -->|"Yes<br/>(Minor sufficient)"| Compatible
    
    style Compatible fill:#e8f5e9
    style Incompatible fill:#ffebee

Sources: wshawk/plugin_system.py:245-261

Compatibility Algorithm

The _is_compatible_version() method implements the following compatibility rules:

def _is_compatible_version(self, required: str, current: str) -> bool:
    """Check if current version meets requirement"""
    try:
        req_parts = [int(p) for p in required.split('.')]
        cur_parts = [int(p) for p in current.split('.')]
        
        # Major version must match
        if req_parts[0] != cur_parts[0]:
            return False
        
        # Minor version must be >= required
        if cur_parts[1] < req_parts[1]:
            return False
        
        return True
    except (ValueError, AttributeError, IndexError):
        return False

Compatibility Rules:

| Rule | Description | Example | |------|-------------|---------| | Major Match | Major versions must be identical | Plugin requires "2.x.x" → WSHawk "2.5.0" ✓, "3.0.0" ✗ | | Minor Minimum | Current minor ≥ required minor | Plugin requires "2.1.x" → WSHawk "2.3.0" ✓, "2.0.5" ✗ | | Patch Ignored | Patch version not checked | Plugin requires "2.1.0" → WSHawk "2.1.9" ✓ |

Rationale:

  • Major version changes indicate breaking API changes
  • Minor version increases add features while maintaining backward compatibility
  • Patch versions are bug fixes and don't affect compatibility

Sources: wshawk/plugin_system.py:245-261, wshawk/plugin_system.py:226-229


Integrity Validation with Checksums

The PluginManager calculates SHA-256 checksums during plugin discovery to detect file modifications and ensure integrity.

Checksum Calculation

During the _scan_available_plugins() phase, checksums are calculated:

for filename in os.listdir(self.plugin_dir):
    if filename.endswith('.py') and not filename.startswith('_'):
        plugin_name = filename[:-3]
        plugin_path = os.path.join(self.plugin_dir, filename)
        
        # Calculate checksum
        with open(plugin_path, 'rb') as f:
            checksum = hashlib.sha256(f.read()).hexdigest()
        
        self._available_plugins[plugin_name] = plugin_path
        self._plugin_checksums[plugin_name] = checksum

Checksum Storage

The PluginManager maintains two dictionaries:

| Dictionary | Type | Purpose | |------------|------|---------| | _available_plugins | Dict[str, str] | Maps plugin name → file path | | _plugin_checksums | Dict[str, str] | Maps plugin name → SHA-256 checksum |

Checksum Use Cases

  1. Tamper Detection: Compare stored checksums against recalculated values to detect modifications
  2. Cache Invalidation: Trigger plugin reload when checksum changes
  3. Audit Logging: Track plugin file modifications over time
  4. Security Scanning: Verify plugins haven't been altered by attackers

Note: Current implementation calculates checksums but does not automatically validate them on reload. This is an extension point for future security enhancements.

Sources: wshawk/plugin_system.py:141-156, wshawk/plugin_system.py:8-9


Duplicate Detection and Override Policies

The plugin system prevents accidental plugin replacement through duplicate detection and configurable override policies.

Duplicate Detection Logic

The _register_plugin_internal() method checks for duplicates before registration:

def _register_plugin_internal(self, plugin: PluginBase) -> bool:
    metadata = plugin.get_metadata()
    name = metadata.name
    
    # Check for duplicates
    if isinstance(plugin, PayloadPlugin):
        if name in self._payload_plugins:
            if not self.allow_override:
                print(f"[ERROR] Duplicate payload plugin: {name}")
                return False
            else:
                print(f"[WARNING] Overriding payload plugin: {name}")
        
        self._payload_plugins[name] = plugin
        # ...

Override Policies

The PluginManager.allow_override flag controls duplicate handling:

| allow_override | Behavior | Use Case | |------------------|----------|----------| | False (default) | Reject duplicate plugins with error | Production environments - prevent accidental overwrites | | True | Allow duplicate plugins with warning | Development/testing - hot reload during development |

Setting Override Policy

# Default: strict mode (no overrides)
manager = PluginManager()

# Development mode: allow overrides
manager = PluginManager()
manager.allow_override = True

Duplicate Detection by Plugin Type

Duplicates are checked separately for each plugin type:

graph LR
    Plugin["Plugin Instance"]
    
    CheckType{"isinstance<br/>check"}
    
    PayloadCheck["Check name in<br/>_payload_plugins"]
    DetectorCheck["Check name in<br/>_detector_plugins"]
    ProtocolCheck["Check name in<br/>_protocol_plugins"]
    
    Policy{"allow_override?"}
    
    Error["Return False<br/>Log [ERROR]"]
    Warning["Return True<br/>Log [WARNING]<br/>Override existing"]
    Success["Return True<br/>Log [OK]<br/>Register new"]
    
    Plugin --> CheckType
    
    CheckType -->|PayloadPlugin| PayloadCheck
    CheckType -->|DetectorPlugin| DetectorCheck
    CheckType -->|ProtocolPlugin| ProtocolCheck
    
    PayloadCheck -->|duplicate| Policy
    DetectorCheck -->|duplicate| Policy
    ProtocolCheck -->|duplicate| Policy
    
    PayloadCheck -->|new| Success
    DetectorCheck -->|new| Success
    ProtocolCheck -->|new| Success
    
    Policy -->|False| Error
    Policy -->|True| Warning

Sources: wshawk/plugin_system.py:263-313, wshawk/plugin_system.py:130


Validation Error Handling

The validation pipeline provides detailed error logging at each stage to aid debugging:

Validation Failure Points

| Validation Stage | Method | Error Message | Consequence | |------------------|--------|---------------|-------------| | Metadata Missing | _validate_plugin() | "Plugin missing name or version" | Plugin rejected | | Invalid Version | _is_valid_version() | "Invalid version format: {version}" | Plugin rejected | | Incompatible WSHawk | _is_compatible_version() | "Plugin requires WSHawk {min_version}" | Plugin rejected | | Duplicate Name | _register_plugin_internal() | "Duplicate {type} plugin: {name}" | Plugin rejected (unless allow_override=True) | | Import Error | _load_plugin_lazy() | "Failed to load plugin {name}: {error}" | Plugin not loaded | | Registration Failure | _register_plugin_internal() | "Plugin {name} registration failed" | Plugin not loaded |

Thread Safety

Validation operations are protected by a threading lock:

def _load_plugin_lazy(self, plugin_name: str) -> bool:
    with self._lock:
        if plugin_name in self._loaded_plugins:
            return True
        # ... validation logic

The threading.Lock() ensures:

  • Atomic validation and registration operations
  • Prevention of race conditions when loading plugins concurrently
  • Safe operation in multi-threaded environments (e.g., web dashboard)

Sources: wshawk/plugin_system.py:203-235, wshawk/plugin_system.py:158-201, wshawk/plugin_system.py:125-127


Diagram: Plugin Metadata and Validation Rules

graph TB
    subgraph "PluginMetadata Dataclass"
        Name["name: str<br/>(Required)"]
        Version["version: str<br/>(Required)<br/>Format: X.Y.Z"]
        Desc["description: str<br/>(Required)"]
        Author["author: str<br/>(Default: 'Regaan')"]
        Requires["requires: List[str]<br/>(Optional)<br/>Python packages"]
        MinWSHawk["min_wshawk_version: str<br/>(Default: '2.0.0')<br/>Format: X.Y.Z"]
        Checksum["checksum: str<br/>(Optional)<br/>SHA-256"]
    end
    
    subgraph "Validation Rules"
        Rule1["Name must be non-empty string"]
        Rule2["Version must be semver<br/>3 numeric parts: X.Y.Z"]
        Rule3["min_wshawk_version must match<br/>current WSHawk major version<br/>and minor >= required"]
        Rule4["All fields must be serializable<br/>to JSON via to_dict()"]
    end
    
    subgraph "Validation Methods"
        ValidatePlugin["_validate_plugin(plugin)<br/>Main validation entry point"]
        ValidVersion["_is_valid_version(version)<br/>Check semver format"]
        CompatVersion["_is_compatible_version(required, current)<br/>Check WSHawk compatibility"]
    end
    
    Name --> Rule1
    Version --> Rule2
    MinWSHawk --> Rule3
    
    Rule1 --> ValidatePlugin
    Rule2 --> ValidVersion
    Rule3 --> CompatVersion
    
    ValidVersion --> ValidatePlugin
    CompatVersion --> ValidatePlugin
    
    ValidatePlugin -->|"Pass"| Register["_register_plugin_internal()"]
    ValidatePlugin -->|"Fail"| Reject["Plugin Rejected<br/>Error Logged"]

Sources: wshawk/plugin_system.py:17-33, wshawk/plugin_system.py:203-235, wshawk/plugin_system.py:237-261


Security Considerations

Validation as Security Layer

Plugin validation serves multiple security purposes:

  1. Code Integrity: Checksums detect unauthorized modifications
  2. Version Pinning: Prevents loading plugins with known vulnerabilities in older versions
  3. Namespace Isolation: Unique plugin names prevent naming collisions and shadowing attacks
  4. Compatibility Enforcement: Prevents loading plugins that may exploit API changes

Security Best Practices

| Practice | Implementation | Rationale | |----------|----------------|-----------| | Strict Version Validation | Reject non-semver strings | Prevents injection of malicious code through version fields | | Checksum Verification | SHA-256 file hashing | Detects tampering with plugin files | | Thread-Safe Loading | threading.Lock() | Prevents race conditions in validation | | Metadata Validation | Required fields enforcement | Ensures plugin identity and traceability | | Override Control | allow_override=False default | Prevents accidental plugin replacement |

Plugin Sandboxing Limitations

Current Implementation:

  • Plugins run in the same Python process as WSHawk
  • No filesystem or network restrictions
  • Full access to WSHawk internals

Security Note: Only load plugins from trusted sources. Malicious plugins can:

  • Execute arbitrary Python code
  • Access sensitive data in memory
  • Modify WSHawk behavior
  • Exfiltrate scan results

Future Security Enhancements

The following security features are extension points for future development:

  1. Checksum Validation: Compare stored checksums on each load to detect modifications
  2. Signature Verification: Cryptographically sign plugins with author keys
  3. Capability-Based Security: Declare required permissions in metadata (e.g., network access, filesystem access)
  4. Process Isolation: Run untrusted plugins in separate processes with restricted capabilities
  5. Plugin Auditing: Log all plugin loads, method invocations, and results for forensic analysis

Sources: wshawk/plugin_system.py:98-107, docs/DOCKER.md:206-226


Plugin Information Query Methods

The PluginManager provides methods to query loaded and available plugins:

list_plugins()

def list_plugins(self, loaded_only: bool = False) -> Dict:
    """
    List plugins
    
    Args:
        loaded_only: Only show loaded plugins
        
    Returns:
        Plugin information
    """

Return Structure (loaded_only=False):

{
    'available': ['plugin1', 'plugin2'],        # Discovered but not loaded
    'loaded': ['plugin1'],                       # Successfully loaded
    'payload_plugins': ['custom_xss'],           # Loaded payload plugins
    'detector_plugins': ['nosql_detector'],      # Loaded detector plugins
    'protocol_plugins': ['msgpack_handler']      # Loaded protocol plugins
}

get_plugin_info()

def get_plugin_info(self, plugin_name: str) -> Optional[Dict]:
    """Get detailed plugin information"""
    if plugin_name in self._plugin_metadata:
        return self._plugin_metadata[plugin_name].to_dict()
    return None

Return Structure:

{
    'name': 'custom_xss',
    'version': '1.0.0',
    'description': 'Custom XSS payloads for modern frameworks',
    'author': 'WSHawk Team',
    'requires': None,
    'min_wshawk_version': '2.0.0',
    'checksum': ''
}

Sources: wshawk/plugin_system.py:409-438


Complete Validation Example

This example demonstrates the complete validation flow:

from wshawk.plugin_system import (
    PluginManager, PayloadPlugin, PluginMetadata
)

# Create plugin with valid metadata
class ValidPlugin(PayloadPlugin):
    def get_metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="test_plugin",
            version="1.2.3",              # Valid semver
            description="Test plugin",
            author="Test Author",
            min_wshawk_version="2.0.0"   # Compatible with 2.x
        )
    
    def get_payloads(self, vuln_type: str) -> List[str]:
        return ["test_payload"]

# Initialize manager
manager = PluginManager()

# Register plugin (triggers validation)
success = manager.register_plugin(ValidPlugin())
# Output: [OK] Registered payload plugin: test_plugin v1.2.3

# Query plugin info
info = manager.get_plugin_info("test_plugin")
print(f"Loaded: {info['name']} v{info['version']}")

# Invalid plugin example
class InvalidPlugin(PayloadPlugin):
    def get_metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="invalid",
            version="1.0",               # Invalid: missing patch version
            description="Invalid plugin",
            min_wshawk_version="4.0.0"  # Incompatible: major mismatch
        )
    
    def get_payloads(self, vuln_type: str) -> List[str]:
        return []

# Attempt registration (will fail)
success = manager.register_plugin(InvalidPlugin())
# Output: [ERROR] Invalid version format: 1.0
# Returns: False

Sources: wshawk/plugin_system.py:320-333, wshawk/plugin_system.py:443-495


Related Documentation