Creating Detector Plugins
Creating Detector Plugins
The following files were used as context for generating this wiki page:
This document explains how to create custom detector plugins for WSHawk. Detector plugins extend the vulnerability detection capabilities by implementing custom detection logic for new vulnerability types or improving detection accuracy for existing types.
For information about the overall plugin architecture and PluginManager, see Plugin Architecture Overview. For creating payload plugins, see Creating Payload Plugins. For protocol-specific handlers, see Creating Protocol Plugins.
Overview
Detector plugins analyze server responses to identify vulnerabilities. Each plugin implements custom detection logic and returns a confidence score with a description. The plugin system automatically integrates detector plugins into the scanning pipeline, running them against all collected responses.
Sources: wshawk/plugin_system.py:64-84
DetectorPlugin Base Class
Class Hierarchy
graph TB
PluginBase["PluginBase (ABC)"]
DetectorPlugin["DetectorPlugin"]
CustomDetector["Custom Detector<br/>Implementation"]
PluginBase -->|"inherits"| DetectorPlugin
DetectorPlugin -->|"inherits"| CustomDetector
PluginBase -->|"requires"| get_metadata["get_metadata()<br/>→ PluginMetadata"]
DetectorPlugin -->|"requires"| detect["detect(response, payload, context)<br/>→ (bool, str, str)"]
DetectorPlugin -->|"optional"| get_supported_types["get_supported_types()<br/>→ List[str]"]
Diagram: DetectorPlugin Class Hierarchy and Required Methods
The DetectorPlugin class extends PluginBase and requires implementation of two methods:
get_metadata()- Returns plugin metadata for validationdetect()- Analyzes responses for vulnerabilities
Sources: wshawk/plugin_system.py:35-51, wshawk/plugin_system.py:64-84
Required Methods
get_metadata() Method
All detector plugins must implement get_metadata() to return a PluginMetadata instance:
def get_metadata(self) -> PluginMetadata:
return PluginMetadata(
name="detector_name", # Unique identifier
version="1.0.0", # Semver format
description="Description", # Human-readable
author="Author Name", # Optional
min_wshawk_version="2.0.0" # Minimum WSHawk version
)
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| name | str | Yes | Unique plugin identifier (no spaces) |
| version | str | Yes | Semantic version (e.g., "1.0.0") |
| description | str | Yes | Human-readable description |
| author | str | No | Plugin author (default: "Regaan") |
| min_wshawk_version | str | No | Minimum compatible WSHawk version |
| requires | List[str] | No | Python package dependencies |
Sources: wshawk/plugin_system.py:18-33, wshawk/plugin_system.py:470-476
detect() Method
The core detection logic is implemented in the detect() method:
def detect(self, response: str, payload: str, context: Dict = None) -> tuple[bool, str, str]:
"""
Args:
response: Server WebSocket response message
payload: Original payload that was sent
context: Additional context (headers, timing, etc.)
Returns:
Tuple of (is_vulnerable, confidence_level, description)
"""
Return Tuple Structure:
| Position | Type | Values | Description | |----------|------|--------|-------------| | 0 | bool | True/False | Whether vulnerability detected | | 1 | str | "LOW", "MEDIUM", "HIGH" | Confidence level | | 2 | str | Any string | Human-readable description |
Sources: wshawk/plugin_system.py:67-80
Detection Logic Implementation
Basic Detection Pattern
graph LR
Input["Response + Payload<br/>+ Context"]
Parse["Parse Response<br/>Extract Indicators"]
Match["Match Patterns<br/>Count Indicators"]
Score["Calculate<br/>Confidence"]
Return["Return Tuple<br/>(bool, str, str)"]
Input --> Parse
Parse --> Match
Match --> Score
Score --> Return
Diagram: Typical Detector Plugin Flow
A typical detector implementation follows this pattern:
- Parse Response: Extract relevant data from the response string
- Pattern Matching: Check for vulnerability indicators
- Confidence Scoring: Determine confidence based on indicator count
- Result Construction: Build descriptive message and return tuple
Sources: wshawk/plugin_system.py:478-491
Confidence Level Guidelines
| Confidence | Criteria | Example |
|------------|----------|---------|
| "HIGH" | Multiple strong indicators present | 3+ specific error messages matching vulnerability |
| "MEDIUM" | Single strong indicator or multiple weak indicators | 1-2 indicators, or ambiguous error message |
| "LOW" | Weak indicators or uncertain detection | Single generic indicator, possible false positive |
Sources: wshawk/plugin_system.py:486-490
Context Dictionary
The context parameter provides additional information about the request/response cycle:
graph TB
context["context: Dict"]
headers["headers: Dict<br/>Response headers"]
timing["timing: float<br/>Response time (ms)"]
status["status: int<br/>WebSocket close code"]
metadata["metadata: Dict<br/>Server fingerprint"]
context --> headers
context --> timing
context --> status
context --> metadata
Diagram: Context Dictionary Structure
The context dictionary may contain:
headers: HTTP or WebSocket headerstiming: Response timing informationstatus: Connection status or close codesmetadata: Server fingerprint from MessageAnalyzer
Plugins can use this data for advanced detection logic (e.g., timing-based attacks, header analysis).
Sources: wshawk/plugin_system.py:68-79
Complete Example: AdvancedNoSQLDetector
The following example demonstrates a complete detector plugin implementation:
from wshawk.plugin_system import DetectorPlugin, PluginMetadata
from typing import Dict, List
class AdvancedNoSQLDetector(DetectorPlugin):
"""Advanced NoSQL injection detector with context analysis"""
def get_metadata(self) -> PluginMetadata:
return PluginMetadata(
name="nosql_detector_advanced",
version="1.0.0",
description="Advanced NoSQL injection detector with context analysis",
author="WSHawk Team",
min_wshawk_version="2.0.0"
)
def detect(self, response: str, payload: str, context: Dict = None) -> tuple[bool, str, str]:
# Define NoSQL-specific indicators
nosql_indicators = ['mongodb', 'bson', 'query error', '$ne', '$gt', 'mongoose']
# Case-insensitive search
response_lower = response.lower()
# Count matching indicators
matches = sum(1 for ind in nosql_indicators if ind in response_lower)
# Determine confidence based on indicator count
if matches >= 2:
return (True, "HIGH", f"NoSQL injection detected ({matches} indicators)")
elif matches == 1:
return (True, "MEDIUM", "Possible NoSQL injection")
return (False, "LOW", "No NoSQL indicators found")
def get_supported_types(self) -> List[str]:
return ["nosql", "mongodb", "couchdb"]
Key Implementation Details:
- Indicator List: Pre-defined list of strings that suggest NoSQL vulnerabilities
- Case Normalization: Convert response to lowercase for reliable matching
- Threshold-Based Scoring: Different confidence levels based on indicator count
- Descriptive Messages: Include match count in HIGH confidence results
- Supported Types: Declare which vulnerability categories this detector handles
Sources: wshawk/plugin_system.py:467-494
Optional Methods
get_supported_types()
Declare which vulnerability types the detector supports:
def get_supported_types(self) -> List[str]:
return ["nosql", "mongodb", "couchdb"]
This helps the PluginManager organize detectors and enables targeted detection for specific vulnerability categories.
Sources: wshawk/plugin_system.py:82-84, wshawk/plugin_system.py:493-494
Plugin Registration and Execution
Registration Process
graph TB
Create["Create Plugin<br/>Instance"]
Register["PluginManager.<br/>register_plugin()"]
Validate["_validate_plugin()<br/>Check metadata"]
CheckDup["Check for<br/>Duplicates"]
Store["Store in<br/>_detector_plugins"]
Create --> Register
Register --> Validate
Validate -->|"Valid"| CheckDup
Validate -->|"Invalid"| Fail["Registration<br/>Failed"]
CheckDup -->|"No Duplicate"| Store
CheckDup -->|"Duplicate"| Fail
Store --> Success["Registration<br/>Success"]
Diagram: Detector Plugin Registration Flow
Registration steps:
- Instantiate detector class
- Call
PluginManager.register_plugin(instance) - Manager validates metadata (name, version, compatibility)
- Manager checks for duplicate plugin names
- Plugin stored in
_detector_pluginsdictionary
Sources: wshawk/plugin_system.py:320-333, wshawk/plugin_system.py:263-313
Execution in Scanning Pipeline
graph LR
Response["Response<br/>Received"]
Manager["PluginManager.<br/>run_detectors()"]
LazyLoad["Lazy Load<br/>Plugins"]
Iterate["Iterate<br/>Detectors"]
Execute["Execute<br/>detect()"]
Collect["Collect<br/>Results"]
Response --> Manager
Manager --> LazyLoad
LazyLoad --> Iterate
Iterate --> Execute
Execute --> Collect
Diagram: Detector Execution in Scan Pipeline
The PluginManager.run_detectors() method:
- Lazy loads all available detector plugins
- Iterates through registered detectors
- Calls
detect()on each plugin - Collects positive results with confidence scores
- Returns list of detection dictionaries
Sources: wshawk/plugin_system.py:375-407
Detection Result Format
When a detector identifies a vulnerability, the result is structured as:
{
'plugin': 'nosql_detector_advanced', # Plugin name
'version': '1.0.0', # Plugin version
'confidence': 'HIGH', # Confidence level
'description': 'NoSQL injection detected (3 indicators)' # Description
}
Multiple detectors can identify the same vulnerability with different confidence levels. The scanner aggregates these results during the verification phase.
Sources: wshawk/plugin_system.py:396-403
Error Handling
Detector plugins should handle errors gracefully. The PluginManager wraps each detect() call in a try-except block:
try:
is_vuln, confidence, description = plugin.detect(response, payload, context)
if is_vuln:
results.append({...})
except Exception as e:
print(f"[ERROR] Detector {plugin.get_name()} failed: {e}")
Best Practices:
- Validate input parameters before processing
- Handle missing or malformed responses
- Return
(False, "LOW", "Error occurred")on internal errors - Avoid raising unhandled exceptions
Sources: wshawk/plugin_system.py:395-405
Plugin File Structure
Detector plugins are Python files placed in the plugins/ directory:
plugins/
├── README.md
├── advanced_nosql_detector.py
├── custom_xss_detector.py
└── timing_attack_detector.py
Each file can contain multiple detector classes. The PluginManager scans the directory and lazy loads plugins as needed.
Directory Requirements:
- Files must end with
.py - Files starting with
_are ignored (e.g.,__init__.py) - Each file is imported as a module from the
pluginspackage
Sources: wshawk/plugin_system.py:141-156, plugins/README.md:1-37
Validation and Versioning
Metadata Validation
The PluginManager validates detector plugins before registration:
| Validation Check | Requirement |
|-----------------|-------------|
| Name | Non-empty string, no duplicates |
| Version | Semantic versioning (e.g., "1.0.0") |
| WSHawk Compatibility | min_wshawk_version ≤ current WSHawk version |
| Required Fields | Name and version must be present |
Sources: wshawk/plugin_system.py:203-235
Version Compatibility Check
graph TB
Check["Check Version<br/>Compatibility"]
ParseReq["Parse Required<br/>Version"]
ParseCur["Parse Current<br/>Version"]
CompareMajor["Compare<br/>Major Version"]
CompareMinor["Compare<br/>Minor Version"]
Check --> ParseReq
Check --> ParseCur
ParseReq --> CompareMajor
ParseCur --> CompareMajor
CompareMajor -->|"Must Match"| CompareMinor
CompareMajor -->|"Mismatch"| Reject["Reject Plugin"]
CompareMinor -->|"Current >= Required"| Accept["Accept Plugin"]
CompareMinor -->|"Current < Required"| Reject
Diagram: Plugin Version Compatibility Logic
Version compatibility follows semantic versioning:
- Major version must match exactly
- Minor version of WSHawk must be ≥ required minor version
- Patch version is ignored for compatibility checks
Sources: wshawk/plugin_system.py:245-261
Advanced Example: Timing-Based Detector
This example demonstrates using the context parameter for timing-based detection:
from wshawk.plugin_system import DetectorPlugin, PluginMetadata
from typing import Dict
class TimingAttackDetector(DetectorPlugin):
"""Detect time-based vulnerabilities (e.g., blind SQL injection)"""
def get_metadata(self) -> PluginMetadata:
return PluginMetadata(
name="timing_attack_detector",
version="1.0.0",
description="Detect timing-based vulnerabilities",
author="WSHawk Team"
)
def detect(self, response: str, payload: str, context: Dict = None) -> tuple[bool, str, str]:
if not context or 'timing' not in context:
return (False, "LOW", "No timing data available")
response_time = context['timing'] # milliseconds
# Check for sleep/delay payloads
sleep_indicators = ['sleep(', 'waitfor delay', 'pg_sleep', 'benchmark(']
has_sleep_payload = any(ind in payload.lower() for ind in sleep_indicators)
if has_sleep_payload and response_time > 5000: # 5+ seconds
return (True, "HIGH", f"Time-based injection confirmed ({response_time}ms delay)")
elif response_time > 3000: # 3+ seconds
return (True, "MEDIUM", f"Suspicious delay detected ({response_time}ms)")
return (False, "LOW", f"Normal response time ({response_time}ms)")
Key Features:
- Validates
contextdictionary presence - Correlates payload content with response timing
- Uses timing thresholds for confidence scoring
- Provides detailed timing information in descriptions
Sources: wshawk/plugin_system.py:68-79
Best Practices
Performance Optimization
| Practice | Rationale |
|----------|-----------|
| Avoid regex when possible | String matching is faster than regex for simple patterns |
| Case-normalize once | Convert to lowercase once, not per indicator |
| Short-circuit evaluation | Return early on clear negatives |
| Cache compiled patterns | Use @lru_cache for expensive operations |
Confidence Scoring Guidelines
graph TD
Start["Analyze Response"]
CountInd["Count Indicators"]
Multi{">= 3 Indicators?"}
Some{"1-2 Indicators?"}
Start --> CountInd
CountInd --> Multi
Multi -->|"Yes"| High["Return HIGH<br/>with details"]
Multi -->|"No"| Some
Some -->|"Yes"| Medium["Return MEDIUM<br/>with caution"]
Some -->|"No"| Low["Return LOW<br/>or False"]
Diagram: Confidence Scoring Decision Tree
Recommendations:
- HIGH: Multiple specific indicators, low false positive rate
- MEDIUM: Single strong indicator or multiple weak indicators, possible false positives
- LOW: Very weak signals, high false positive rate
- False: No credible vulnerability indicators
Security Considerations
- Input Sanitization: Never execute or eval user-controlled data
- Resource Limits: Limit processing time to prevent DoS
- Safe Imports: Only import trusted libraries
- Error Messages: Avoid leaking sensitive information in descriptions
Sources: wshawk/plugin_system.py:478-491
Integration with Scanner
Detector plugins automatically integrate with WSHawkV2 scanner:
graph LR
Scanner["WSHawkV2<br/>Scanner"]
SendPayload["Send Payload<br/>via WebSocket"]
ReceiveResp["Receive<br/>Response"]
PluginMgr["PluginManager.<br/>run_detectors()"]
AllDetectors["Iterate All<br/>Detector Plugins"]
Aggregate["Aggregate<br/>Results"]
Report["Add to<br/>Scan Report"]
Scanner --> SendPayload
SendPayload --> ReceiveResp
ReceiveResp --> PluginMgr
PluginMgr --> AllDetectors
AllDetectors --> Aggregate
Aggregate --> Report
Diagram: Detector Plugin Integration with Scanner
The scanner automatically:
- Initializes
PluginManagerat startup - Calls
run_detectors()for each response - Aggregates results with built-in detectors
- Includes plugin detections in vulnerability reports
- Applies CVSS scoring based on confidence levels
Sources: wshawk/plugin_system.py:375-407
Testing Detector Plugins
Manual Testing
Use the built-in test harness in plugin_system.py:
python3 -m wshawk.plugin_system
This executes test cases and validates:
- Plugin registration
- Metadata validation
- Detection logic
- Result formatting
Sources: wshawk/plugin_system.py:498-550
Unit Testing
Create unit tests for individual detector methods:
def test_nosql_detector():
detector = AdvancedNoSQLDetector()
# Test HIGH confidence
response = "MongoDB error: Query failed with $ne operator in mongoose"
is_vuln, conf, desc = detector.detect(response, "test")
assert is_vuln == True
assert conf == "HIGH"
# Test MEDIUM confidence
response = "Error: mongodb connection failed"
is_vuln, conf, desc = detector.detect(response, "test")
assert is_vuln == True
assert conf == "MEDIUM"
# Test negative case
response = "Normal response"
is_vuln, conf, desc = detector.detect(response, "test")
assert is_vuln == False
Summary
Creating detector plugins involves:
- Extend DetectorPlugin: Inherit from
DetectorPluginbase class - Implement get_metadata(): Provide plugin metadata with versioning
- Implement detect(): Core detection logic returning
(bool, str, str) - Optional get_supported_types(): Declare supported vulnerability types
- Place in plugins/: Save as
.pyfile in plugins directory - Test: Validate detection logic and error handling
Detector plugins are automatically discovered, validated, and integrated into the scanning pipeline. They can access response data, timing information, and server context for sophisticated vulnerability detection.
Sources: wshawk/plugin_system.py:64-494, plugins/README.md:1-37