Creating Detector Plugins

Creating Detector Plugins

The following files were used as context for generating this wiki page:

This document explains how to create custom detector plugins for WSHawk. Detector plugins extend the vulnerability detection capabilities by implementing custom detection logic for new vulnerability types or improving detection accuracy for existing types.

For information about the overall plugin architecture and PluginManager, see Plugin Architecture Overview. For creating payload plugins, see Creating Payload Plugins. For protocol-specific handlers, see Creating Protocol Plugins.


Overview

Detector plugins analyze server responses to identify vulnerabilities. Each plugin implements custom detection logic and returns a confidence score with a description. The plugin system automatically integrates detector plugins into the scanning pipeline, running them against all collected responses.

Sources: wshawk/plugin_system.py:64-84


DetectorPlugin Base Class

Class Hierarchy

graph TB
    PluginBase["PluginBase (ABC)"]
    DetectorPlugin["DetectorPlugin"]
    CustomDetector["Custom Detector<br/>Implementation"]
    
    PluginBase -->|"inherits"| DetectorPlugin
    DetectorPlugin -->|"inherits"| CustomDetector
    
    PluginBase -->|"requires"| get_metadata["get_metadata()<br/>→ PluginMetadata"]
    DetectorPlugin -->|"requires"| detect["detect(response, payload, context)<br/>→ (bool, str, str)"]
    DetectorPlugin -->|"optional"| get_supported_types["get_supported_types()<br/>→ List[str]"]

Diagram: DetectorPlugin Class Hierarchy and Required Methods

The DetectorPlugin class extends PluginBase and requires implementation of two methods:

  • get_metadata() - Returns plugin metadata for validation
  • detect() - Analyzes responses for vulnerabilities

Sources: wshawk/plugin_system.py:35-51, wshawk/plugin_system.py:64-84


Required Methods

get_metadata() Method

All detector plugins must implement get_metadata() to return a PluginMetadata instance:

def get_metadata(self) -> PluginMetadata:
    return PluginMetadata(
        name="detector_name",           # Unique identifier
        version="1.0.0",                # Semver format
        description="Description",       # Human-readable
        author="Author Name",           # Optional
        min_wshawk_version="2.0.0"     # Minimum WSHawk version
    )

| Field | Type | Required | Description | |-------|------|----------|-------------| | name | str | Yes | Unique plugin identifier (no spaces) | | version | str | Yes | Semantic version (e.g., "1.0.0") | | description | str | Yes | Human-readable description | | author | str | No | Plugin author (default: "Regaan") | | min_wshawk_version | str | No | Minimum compatible WSHawk version | | requires | List[str] | No | Python package dependencies |

Sources: wshawk/plugin_system.py:18-33, wshawk/plugin_system.py:470-476

detect() Method

The core detection logic is implemented in the detect() method:

def detect(self, response: str, payload: str, context: Dict = None) -> tuple[bool, str, str]:
    """
    Args:
        response: Server WebSocket response message
        payload: Original payload that was sent
        context: Additional context (headers, timing, etc.)
    
    Returns:
        Tuple of (is_vulnerable, confidence_level, description)
    """

Return Tuple Structure:

| Position | Type | Values | Description | |----------|------|--------|-------------| | 0 | bool | True/False | Whether vulnerability detected | | 1 | str | "LOW", "MEDIUM", "HIGH" | Confidence level | | 2 | str | Any string | Human-readable description |

Sources: wshawk/plugin_system.py:67-80


Detection Logic Implementation

Basic Detection Pattern

graph LR
    Input["Response + Payload<br/>+ Context"]
    Parse["Parse Response<br/>Extract Indicators"]
    Match["Match Patterns<br/>Count Indicators"]
    Score["Calculate<br/>Confidence"]
    Return["Return Tuple<br/>(bool, str, str)"]
    
    Input --> Parse
    Parse --> Match
    Match --> Score
    Score --> Return

Diagram: Typical Detector Plugin Flow

A typical detector implementation follows this pattern:

  1. Parse Response: Extract relevant data from the response string
  2. Pattern Matching: Check for vulnerability indicators
  3. Confidence Scoring: Determine confidence based on indicator count
  4. Result Construction: Build descriptive message and return tuple

Sources: wshawk/plugin_system.py:478-491

Confidence Level Guidelines

| Confidence | Criteria | Example | |------------|----------|---------| | "HIGH" | Multiple strong indicators present | 3+ specific error messages matching vulnerability | | "MEDIUM" | Single strong indicator or multiple weak indicators | 1-2 indicators, or ambiguous error message | | "LOW" | Weak indicators or uncertain detection | Single generic indicator, possible false positive |

Sources: wshawk/plugin_system.py:486-490


Context Dictionary

The context parameter provides additional information about the request/response cycle:

graph TB
    context["context: Dict"]
    headers["headers: Dict<br/>Response headers"]
    timing["timing: float<br/>Response time (ms)"]
    status["status: int<br/>WebSocket close code"]
    metadata["metadata: Dict<br/>Server fingerprint"]
    
    context --> headers
    context --> timing
    context --> status
    context --> metadata

Diagram: Context Dictionary Structure

The context dictionary may contain:

  • headers: HTTP or WebSocket headers
  • timing: Response timing information
  • status: Connection status or close codes
  • metadata: Server fingerprint from MessageAnalyzer

Plugins can use this data for advanced detection logic (e.g., timing-based attacks, header analysis).

Sources: wshawk/plugin_system.py:68-79


Complete Example: AdvancedNoSQLDetector

The following example demonstrates a complete detector plugin implementation:

from wshawk.plugin_system import DetectorPlugin, PluginMetadata
from typing import Dict, List

class AdvancedNoSQLDetector(DetectorPlugin):
    """Advanced NoSQL injection detector with context analysis"""
    
    def get_metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="nosql_detector_advanced",
            version="1.0.0",
            description="Advanced NoSQL injection detector with context analysis",
            author="WSHawk Team",
            min_wshawk_version="2.0.0"
        )
    
    def detect(self, response: str, payload: str, context: Dict = None) -> tuple[bool, str, str]:
        # Define NoSQL-specific indicators
        nosql_indicators = ['mongodb', 'bson', 'query error', '$ne', '$gt', 'mongoose']
        
        # Case-insensitive search
        response_lower = response.lower()
        
        # Count matching indicators
        matches = sum(1 for ind in nosql_indicators if ind in response_lower)
        
        # Determine confidence based on indicator count
        if matches >= 2:
            return (True, "HIGH", f"NoSQL injection detected ({matches} indicators)")
        elif matches == 1:
            return (True, "MEDIUM", "Possible NoSQL injection")
        
        return (False, "LOW", "No NoSQL indicators found")
    
    def get_supported_types(self) -> List[str]:
        return ["nosql", "mongodb", "couchdb"]

Key Implementation Details:

  1. Indicator List: Pre-defined list of strings that suggest NoSQL vulnerabilities
  2. Case Normalization: Convert response to lowercase for reliable matching
  3. Threshold-Based Scoring: Different confidence levels based on indicator count
  4. Descriptive Messages: Include match count in HIGH confidence results
  5. Supported Types: Declare which vulnerability categories this detector handles

Sources: wshawk/plugin_system.py:467-494


Optional Methods

get_supported_types()

Declare which vulnerability types the detector supports:

def get_supported_types(self) -> List[str]:
    return ["nosql", "mongodb", "couchdb"]

This helps the PluginManager organize detectors and enables targeted detection for specific vulnerability categories.

Sources: wshawk/plugin_system.py:82-84, wshawk/plugin_system.py:493-494


Plugin Registration and Execution

Registration Process

graph TB
    Create["Create Plugin<br/>Instance"]
    Register["PluginManager.<br/>register_plugin()"]
    Validate["_validate_plugin()<br/>Check metadata"]
    CheckDup["Check for<br/>Duplicates"]
    Store["Store in<br/>_detector_plugins"]
    
    Create --> Register
    Register --> Validate
    Validate -->|"Valid"| CheckDup
    Validate -->|"Invalid"| Fail["Registration<br/>Failed"]
    CheckDup -->|"No Duplicate"| Store
    CheckDup -->|"Duplicate"| Fail
    Store --> Success["Registration<br/>Success"]

Diagram: Detector Plugin Registration Flow

Registration steps:

  1. Instantiate detector class
  2. Call PluginManager.register_plugin(instance)
  3. Manager validates metadata (name, version, compatibility)
  4. Manager checks for duplicate plugin names
  5. Plugin stored in _detector_plugins dictionary

Sources: wshawk/plugin_system.py:320-333, wshawk/plugin_system.py:263-313

Execution in Scanning Pipeline

graph LR
    Response["Response<br/>Received"]
    Manager["PluginManager.<br/>run_detectors()"]
    LazyLoad["Lazy Load<br/>Plugins"]
    Iterate["Iterate<br/>Detectors"]
    Execute["Execute<br/>detect()"]
    Collect["Collect<br/>Results"]
    
    Response --> Manager
    Manager --> LazyLoad
    LazyLoad --> Iterate
    Iterate --> Execute
    Execute --> Collect

Diagram: Detector Execution in Scan Pipeline

The PluginManager.run_detectors() method:

  1. Lazy loads all available detector plugins
  2. Iterates through registered detectors
  3. Calls detect() on each plugin
  4. Collects positive results with confidence scores
  5. Returns list of detection dictionaries

Sources: wshawk/plugin_system.py:375-407


Detection Result Format

When a detector identifies a vulnerability, the result is structured as:

{
    'plugin': 'nosql_detector_advanced',     # Plugin name
    'version': '1.0.0',                      # Plugin version
    'confidence': 'HIGH',                    # Confidence level
    'description': 'NoSQL injection detected (3 indicators)'  # Description
}

Multiple detectors can identify the same vulnerability with different confidence levels. The scanner aggregates these results during the verification phase.

Sources: wshawk/plugin_system.py:396-403


Error Handling

Detector plugins should handle errors gracefully. The PluginManager wraps each detect() call in a try-except block:

try:
    is_vuln, confidence, description = plugin.detect(response, payload, context)
    if is_vuln:
        results.append({...})
except Exception as e:
    print(f"[ERROR] Detector {plugin.get_name()} failed: {e}")

Best Practices:

  • Validate input parameters before processing
  • Handle missing or malformed responses
  • Return (False, "LOW", "Error occurred") on internal errors
  • Avoid raising unhandled exceptions

Sources: wshawk/plugin_system.py:395-405


Plugin File Structure

Detector plugins are Python files placed in the plugins/ directory:

plugins/
├── README.md
├── advanced_nosql_detector.py
├── custom_xss_detector.py
└── timing_attack_detector.py

Each file can contain multiple detector classes. The PluginManager scans the directory and lazy loads plugins as needed.

Directory Requirements:

  • Files must end with .py
  • Files starting with _ are ignored (e.g., __init__.py)
  • Each file is imported as a module from the plugins package

Sources: wshawk/plugin_system.py:141-156, plugins/README.md:1-37


Validation and Versioning

Metadata Validation

The PluginManager validates detector plugins before registration:

| Validation Check | Requirement | |-----------------|-------------| | Name | Non-empty string, no duplicates | | Version | Semantic versioning (e.g., "1.0.0") | | WSHawk Compatibility | min_wshawk_version ≤ current WSHawk version | | Required Fields | Name and version must be present |

Sources: wshawk/plugin_system.py:203-235

Version Compatibility Check

graph TB
    Check["Check Version<br/>Compatibility"]
    ParseReq["Parse Required<br/>Version"]
    ParseCur["Parse Current<br/>Version"]
    CompareMajor["Compare<br/>Major Version"]
    CompareMinor["Compare<br/>Minor Version"]
    
    Check --> ParseReq
    Check --> ParseCur
    ParseReq --> CompareMajor
    ParseCur --> CompareMajor
    CompareMajor -->|"Must Match"| CompareMinor
    CompareMajor -->|"Mismatch"| Reject["Reject Plugin"]
    CompareMinor -->|"Current >= Required"| Accept["Accept Plugin"]
    CompareMinor -->|"Current < Required"| Reject

Diagram: Plugin Version Compatibility Logic

Version compatibility follows semantic versioning:

  • Major version must match exactly
  • Minor version of WSHawk must be ≥ required minor version
  • Patch version is ignored for compatibility checks

Sources: wshawk/plugin_system.py:245-261


Advanced Example: Timing-Based Detector

This example demonstrates using the context parameter for timing-based detection:

from wshawk.plugin_system import DetectorPlugin, PluginMetadata
from typing import Dict

class TimingAttackDetector(DetectorPlugin):
    """Detect time-based vulnerabilities (e.g., blind SQL injection)"""
    
    def get_metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="timing_attack_detector",
            version="1.0.0",
            description="Detect timing-based vulnerabilities",
            author="WSHawk Team"
        )
    
    def detect(self, response: str, payload: str, context: Dict = None) -> tuple[bool, str, str]:
        if not context or 'timing' not in context:
            return (False, "LOW", "No timing data available")
        
        response_time = context['timing']  # milliseconds
        
        # Check for sleep/delay payloads
        sleep_indicators = ['sleep(', 'waitfor delay', 'pg_sleep', 'benchmark(']
        has_sleep_payload = any(ind in payload.lower() for ind in sleep_indicators)
        
        if has_sleep_payload and response_time > 5000:  # 5+ seconds
            return (True, "HIGH", f"Time-based injection confirmed ({response_time}ms delay)")
        elif response_time > 3000:  # 3+ seconds
            return (True, "MEDIUM", f"Suspicious delay detected ({response_time}ms)")
        
        return (False, "LOW", f"Normal response time ({response_time}ms)")

Key Features:

  • Validates context dictionary presence
  • Correlates payload content with response timing
  • Uses timing thresholds for confidence scoring
  • Provides detailed timing information in descriptions

Sources: wshawk/plugin_system.py:68-79


Best Practices

Performance Optimization

| Practice | Rationale | |----------|-----------| | Avoid regex when possible | String matching is faster than regex for simple patterns | | Case-normalize once | Convert to lowercase once, not per indicator | | Short-circuit evaluation | Return early on clear negatives | | Cache compiled patterns | Use @lru_cache for expensive operations |

Confidence Scoring Guidelines

graph TD
    Start["Analyze Response"]
    CountInd["Count Indicators"]
    Multi{">= 3 Indicators?"}
    Some{"1-2 Indicators?"}
    
    Start --> CountInd
    CountInd --> Multi
    Multi -->|"Yes"| High["Return HIGH<br/>with details"]
    Multi -->|"No"| Some
    Some -->|"Yes"| Medium["Return MEDIUM<br/>with caution"]
    Some -->|"No"| Low["Return LOW<br/>or False"]

Diagram: Confidence Scoring Decision Tree

Recommendations:

  • HIGH: Multiple specific indicators, low false positive rate
  • MEDIUM: Single strong indicator or multiple weak indicators, possible false positives
  • LOW: Very weak signals, high false positive rate
  • False: No credible vulnerability indicators

Security Considerations

  1. Input Sanitization: Never execute or eval user-controlled data
  2. Resource Limits: Limit processing time to prevent DoS
  3. Safe Imports: Only import trusted libraries
  4. Error Messages: Avoid leaking sensitive information in descriptions

Sources: wshawk/plugin_system.py:478-491


Integration with Scanner

Detector plugins automatically integrate with WSHawkV2 scanner:

graph LR
    Scanner["WSHawkV2<br/>Scanner"]
    SendPayload["Send Payload<br/>via WebSocket"]
    ReceiveResp["Receive<br/>Response"]
    PluginMgr["PluginManager.<br/>run_detectors()"]
    AllDetectors["Iterate All<br/>Detector Plugins"]
    Aggregate["Aggregate<br/>Results"]
    Report["Add to<br/>Scan Report"]
    
    Scanner --> SendPayload
    SendPayload --> ReceiveResp
    ReceiveResp --> PluginMgr
    PluginMgr --> AllDetectors
    AllDetectors --> Aggregate
    Aggregate --> Report

Diagram: Detector Plugin Integration with Scanner

The scanner automatically:

  1. Initializes PluginManager at startup
  2. Calls run_detectors() for each response
  3. Aggregates results with built-in detectors
  4. Includes plugin detections in vulnerability reports
  5. Applies CVSS scoring based on confidence levels

Sources: wshawk/plugin_system.py:375-407


Testing Detector Plugins

Manual Testing

Use the built-in test harness in plugin_system.py:

python3 -m wshawk.plugin_system

This executes test cases and validates:

  • Plugin registration
  • Metadata validation
  • Detection logic
  • Result formatting

Sources: wshawk/plugin_system.py:498-550

Unit Testing

Create unit tests for individual detector methods:

def test_nosql_detector():
    detector = AdvancedNoSQLDetector()
    
    # Test HIGH confidence
    response = "MongoDB error: Query failed with $ne operator in mongoose"
    is_vuln, conf, desc = detector.detect(response, "test")
    assert is_vuln == True
    assert conf == "HIGH"
    
    # Test MEDIUM confidence
    response = "Error: mongodb connection failed"
    is_vuln, conf, desc = detector.detect(response, "test")
    assert is_vuln == True
    assert conf == "MEDIUM"
    
    # Test negative case
    response = "Normal response"
    is_vuln, conf, desc = detector.detect(response, "test")
    assert is_vuln == False

Summary

Creating detector plugins involves:

  1. Extend DetectorPlugin: Inherit from DetectorPlugin base class
  2. Implement get_metadata(): Provide plugin metadata with versioning
  3. Implement detect(): Core detection logic returning (bool, str, str)
  4. Optional get_supported_types(): Declare supported vulnerability types
  5. Place in plugins/: Save as .py file in plugins directory
  6. Test: Validate detection logic and error handling

Detector plugins are automatically discovered, validated, and integrated into the scanning pipeline. They can access response data, timing information, and server context for sophisticated vulnerability detection.

Sources: wshawk/plugin_system.py:64-494, plugins/README.md:1-37