Adding Vulnerability Detection Modules

Adding Vulnerability Detection Modules

The following files were used as context for generating this wiki page:

Purpose and Scope

This page guides developers through the process of creating new vulnerability detection modules for WSHawk. It covers the standard detection pattern, integration points, verification logic, error handling, and CVSS scoring requirements.

For information about adding mutation strategies to evade WAFs, see Adding Mutation Strategies. For payload collection management, see Adding Payload Collections. For general development setup, see Development Environment Setup.


Vulnerability Detection Architecture

WSHawk's vulnerability detection system follows a modular architecture where the WSHawkV2 scanner orchestrates individual test methods that leverage specialized verification and analysis modules.

graph TB
    Scanner["WSHawkV2 Scanner<br/>(scanner_v2.py)"]
    
    subgraph "Test Methods"
        SQL["test_sql_injection_v2()"]
        XSS["test_xss_v2()"]
        CMD["test_command_injection_v2()"]
        XXE["test_xxe_v2()"]
        NoSQL["test_nosql_injection_v2()"]
        SSRF["test_ssrf_v2()"]
        Path["test_path_traversal_v2()"]
    end
    
    subgraph "Analysis & Verification"
        Verifier["VulnerabilityVerifier<br/>vulnerability_verifier.py"]
        Analyzer["MessageAnalyzer<br/>message_intelligence.py"]
        Fingerprint["ServerFingerprinter<br/>server_fingerprint.py"]
    end
    
    subgraph "Advanced Verification"
        Browser["HeadlessBrowserXSSVerifier<br/>headless_xss_verifier.py"]
        OAST["OASTProvider<br/>oast_provider.py"]
    end
    
    subgraph "Data Sources"
        Payloads["WSPayloads<br/>__main__.py"]
    end
    
    Scanner --> SQL
    Scanner --> XSS
    Scanner --> CMD
    Scanner --> XXE
    Scanner --> NoSQL
    Scanner --> SSRF
    Scanner --> Path
    
    SQL --> Verifier
    XSS --> Verifier
    CMD --> Verifier
    Path --> Verifier
    
    SQL --> Analyzer
    XSS --> Analyzer
    CMD --> Analyzer
    
    SQL --> Fingerprint
    CMD --> Fingerprint
    
    XSS --> Browser
    XXE --> OAST
    SSRF --> OAST
    
    SQL --> Payloads
    XSS --> Payloads
    CMD --> Payloads
    XXE --> Payloads
    NoSQL --> Payloads
    Path --> Payloads

Sources: wshawk/scanner_v2.py:1-678


Standard Vulnerability Detection Pattern

All vulnerability detection methods in WSHawk follow a consistent pattern to ensure maintainability and reliability.

Method Signature Pattern

async def test_<vulnerability_type>_v2(self, ws) -> List[Dict]
  • Method name: test_<type>_v2 where <type> describes the vulnerability
  • Parameters: ws (WebSocket connection object)
  • Returns: List[Dict] containing results with payload and confidence level

Detection Flow

flowchart TD
    Start["test_*_v2(ws) called"]
    Log["Log test initiation<br/>Logger.info()"]
    LoadPayloads["Load payloads<br/>WSPayloads.get_*()"]
    CheckFingerprint["Check ServerFingerprinter<br/>for targeted payloads"]
    GetBase["Get base_message from<br/>self.sample_messages"]
    
    LoopStart["For each payload"]
    Inject["Inject payload using<br/>MessageAnalyzer.inject_payload_into_message()"]
    RateLimit["Apply rate limiting<br/>await asyncio.sleep(0.05)"]
    Send["Send via ws.send(msg)<br/>Increment self.messages_sent"]
    Receive["Receive via ws.recv()<br/>with timeout=2.0"]
    
    Verify["Verify using<br/>VulnerabilityVerifier.verify_*()"]
    CheckConfidence{"confidence !=<br/>ConfidenceLevel.LOW"}
    
    LogVuln["Logger.vuln() with details"]
    AppendVuln["Append to self.vulnerabilities"]
    AppendResult["Append to results list"]
    
    HandleTimeout["Handle asyncio.TimeoutError"]
    HandleError["Handle general Exception"]
    
    LoopEnd{"More payloads?"}
    Return["Return results"]
    
    Start --> Log
    Log --> LoadPayloads
    LoadPayloads --> CheckFingerprint
    CheckFingerprint --> GetBase
    GetBase --> LoopStart
    
    LoopStart --> Inject
    Inject --> RateLimit
    RateLimit --> Send
    Send --> Receive
    
    Receive --> Verify
    Receive -->|"TimeoutError"| HandleTimeout
    
    Verify --> CheckConfidence
    CheckConfidence -->|"Yes"| LogVuln
    LogVuln --> AppendVuln
    AppendVuln --> AppendResult
    
    AppendResult --> LoopEnd
    CheckConfidence -->|"No"| LoopEnd
    HandleTimeout --> LoopEnd
    
    Inject -->|"Exception"| HandleError
    HandleError --> LoopEnd
    
    LoopEnd -->|"Yes"| LoopStart
    LoopEnd -->|"No"| Return

Sources: wshawk/scanner_v2.py:143-210, wshawk/scanner_v2.py:212-290, wshawk/scanner_v2.py:292-356


Core Components

Vulnerability Data Structure

Each detected vulnerability must follow this standard structure:

| Field | Type | Required | Description | |-------|------|----------|-------------| | type | str | Yes | Human-readable vulnerability type (e.g., "SQL Injection") | | severity | str | Yes | Severity level: "CRITICAL", "HIGH", "MEDIUM", "LOW" | | confidence | str | Yes | Confidence level: "CRITICAL", "HIGH", "MEDIUM", "LOW" | | description | str | Yes | Detailed description of what was detected | | payload | str | Yes | The payload that triggered the vulnerability | | response_snippet | str | Yes | First 200 characters of the server response | | recommendation | str | Yes | Remediation guidance | | cvss_score | float | Optional | CVSS v3.1 score (0.0-10.0) | | browser_verified | bool | Optional | Whether browser verification succeeded (XSS) | | Additional fields | Various | Optional | Module-specific metadata |

Sources: wshawk/scanner_v2.py:190-198, wshawk/scanner_v2.py:270-279

ConfidenceLevel Enum

The VulnerabilityVerifier class uses a ConfidenceLevel enum to rate detection confidence:

  • CRITICAL: Verified execution (e.g., browser-executed XSS)
  • HIGH: Strong indicators with minimal false positive risk
  • MEDIUM: Multiple indicators present, but needs manual verification
  • LOW: Weak indicators, likely false positive

Sources: wshawk/scanner_v2.py:16-17


Step-by-Step: Creating a New Vulnerability Detector

Step 1: Add Test Method to WSHawkV2

Create a new async method in the WSHawkV2 class following the naming convention test_<vulnerability_type>_v2:

async def test_<vulnerability_type>_v2(self, ws) -> List[Dict]:
    """
    Enhanced <vulnerability type> testing with automated verification
    """
    Logger.info("Testing <vulnerability type>...")
    
    results = []
    # Implementation follows...
    return results

Location: Add method to wshawk/scanner_v2.py:28-677 in the WSHawkV2 class.

Sources: wshawk/scanner_v2.py:28-677

Step 2: Load Payloads

Load vulnerability-specific payloads from the WSPayloads class:

payloads = WSPayloads.get_<vulnerability_type>()[:100]  # Limit for performance

The WSPayloads class provides static methods for each vulnerability type. If adding a new type, you must also add a corresponding method to WSPayloads (see Adding Payload Collections).

Sources: wshawk/scanner_v2.py:26, wshawk/scanner_v2.py:150

Step 3: Server Fingerprinting Integration

Leverage server fingerprinting to use targeted payloads:

fingerprint = self.fingerprinter.fingerprint()
if fingerprint.database:  # or .language, .framework
    recommended = self.fingerprinter.get_recommended_payloads(fingerprint)
    if recommended.get('<vulnerability_type>'):
        Logger.info(f"Using {fingerprint.database}-specific payloads")
        payloads = recommended['<vulnerability_type>'] + payloads[:50]

Sources: wshawk/scanner_v2.py:153-158, wshawk/scanner_v2.py:302-307

Step 4: Payload Injection

Use the learning phase results to intelligently inject payloads:

base_message = self.sample_messages[0] if self.sample_messages else '{"test": "value"}'

for payload in payloads:
    try:
        # Automated injection into message structure
        if self.learning_complete and self.message_analyzer.detected_format == MessageFormat.JSON:
            injected_messages = self.message_analyzer.inject_payload_into_message(
                base_message, payload
            )
        else:
            injected_messages = [payload]
        
        for msg in injected_messages:
            # Send and verify each injected message

This leverages the MessageAnalyzer to inject payloads into appropriate fields of JSON/XML messages detected during the learning phase.

Sources: wshawk/scanner_v2.py:161-173, wshawk/scanner_v2.py:15

Step 5: Send and Receive

Implement rate-limited sending and receiving with proper error handling:

for msg in injected_messages:
    await ws.send(msg)
    self.messages_sent += 1
    
    try:
        response = await asyncio.wait_for(ws.recv(), timeout=2.0)
        self.messages_received += 1
        
        # Verification logic here...
        
    except asyncio.TimeoutError:
        pass  # Expected for some payloads
    
    await asyncio.sleep(0.05)  # Rate limiting

Key points:

  • Increment self.messages_sent and self.messages_received for statistics
  • Use asyncio.wait_for() with timeout to prevent hangs
  • Handle asyncio.TimeoutError gracefully
  • Apply rate limiting with asyncio.sleep() (or use self.rate_limiter.acquire())

Sources: wshawk/scanner_v2.py:174-204, wshawk/scanner_v2.py:19

Step 6: Verification Logic

Add verification logic to VulnerabilityVerifier or implement inline verification:

Option A: Add to VulnerabilityVerifier

Create a method in the VulnerabilityVerifier class:

def verify_<vulnerability_type>(self, response: str, payload: str) -> Tuple[bool, ConfidenceLevel, str]:
    """
    Verify <vulnerability type> based on response heuristics
    
    Returns:
        (is_vulnerable, confidence_level, description)
    """
    # Heuristic checks
    if strong_indicator in response:
        return True, ConfidenceLevel.HIGH, "Description"
    elif weak_indicator in response:
        return True, ConfidenceLevel.MEDIUM, "Description"
    
    return False, ConfidenceLevel.LOW, "Not vulnerable"

Then call it in your test method:

is_vuln, confidence, description = self.verifier.verify_<vulnerability_type>(
    response, payload
)

Sources: wshawk/scanner_v2.py:16, wshawk/scanner_v2.py:182-185

Option B: Inline Verification

For simpler checks, implement verification directly in the test method:

vulnerability_indicators = ['error_msg_1', 'error_msg_2', 'specific_pattern']
if any(ind.lower() in response.lower() for ind in vulnerability_indicators):
    Logger.vuln(f"<Vulnerability Type> [HIGH]: Description")
    # Append to self.vulnerabilities

Sources: wshawk/scanner_v2.py:432-444, wshawk/scanner_v2.py:472-484

Step 7: Record Vulnerabilities

When a vulnerability is detected with sufficient confidence:

if is_vuln and confidence != ConfidenceLevel.LOW:
    Logger.vuln(f"<Vulnerability Type> [{confidence.value}]: {description}")
    Logger.vuln(f"Payload: {payload[:80]}")
    
    self.vulnerabilities.append({
        'type': '<Vulnerability Type Full Name>',
        'severity': confidence.value,
        'confidence': confidence.value,
        'description': description,
        'payload': payload,
        'response_snippet': response[:200],
        'recommendation': 'Specific remediation advice',
        'cvss_score': calculated_cvss_score  # Optional
    })
    results.append({'payload': payload, 'confidence': confidence.value})

Sources: wshawk/scanner_v2.py:186-199, wshawk/scanner_v2.py:377-388

Step 8: Integration with Scanner

Add your test method to the run_heuristic_scan() method:

async def run_heuristic_scan(self):
    # ... existing setup code ...
    
    # Run ALL tests with heuristics and rate limiting
    await self.test_sql_injection_v2(ws)
    print()
    
    await self.test_xss_v2(ws)
    print()
    
    # Add your new test here
    await self.test_<vulnerability_type>_v2(ws)
    print()
    
    # ... remaining tests ...

Sources: wshawk/scanner_v2.py:542-584


Advanced Verification Integration

Headless Browser Verification (XSS)

For XSS vulnerabilities, integrate with the HeadlessBrowserXSSVerifier for real execution verification:

browser_verified = False
if confidence == ConfidenceLevel.HIGH and self.use_headless_browser:
    try:
        if not self.headless_verifier:
            self.headless_verifier = HeadlessBrowserXSSVerifier()
            await self.headless_verifier.start()
        
        is_executed, evidence = await self.headless_verifier.verify_xss_execution(
            response, payload
        )
        
        if is_executed:
            browser_verified = True
            confidence = ConfidenceLevel.CRITICAL
            description = f"REAL EXECUTION: {evidence}"
    except Exception as e:
        Logger.error(f"Browser verification failed: {e}")

Sources: wshawk/scanner_v2.py:248-263, wshawk/scanner_v2.py:21

OAST Integration (XXE, SSRF, Blind RCE)

For blind vulnerabilities, integrate with the OASTProvider:

# Start OAST if enabled
if self.use_oast and not self.oast_provider:
    try:
        self.oast_provider = OASTProvider(use_interactsh=False, custom_server="localhost:8888")
        await self.oast_provider.start()
        Logger.info("OAST provider started for blind detection")
    except Exception as e:
        Logger.error(f"OAST start failed: {e}")
        self.use_oast = False

# Generate OAST payload
if self.use_oast and self.oast_provider:
    oast_payload = self.oast_provider.generate_payload('<vuln_type>', f'test{len(results)}')
    msg = json.dumps({"action": "parse_xml", "xml": oast_payload})
else:
    msg = json.dumps({"action": "parse_xml", "xml": payload})

After sending, poll for callbacks to confirm blind vulnerabilities.

Sources: wshawk/scanner_v2.py:407-423, wshawk/scanner_v2.py:22


Error Handling Best Practices

Timeout Handling

Always wrap ws.recv() in asyncio.wait_for() to prevent indefinite hangs:

try:
    response = await asyncio.wait_for(ws.recv(), timeout=2.0)
    self.messages_received += 1
    # Process response...
except asyncio.TimeoutError:
    pass  # Some payloads intentionally cause timeouts

Adjust timeout based on vulnerability type:

  • Standard tests: timeout=2.0
  • Slow operations (SSRF, file reads): timeout=3.0
  • Command injection with sleep: timeout=5.0

Sources: wshawk/scanner_v2.py:177-202, wshawk/scanner_v2.py:516-534

Exception Handling

Wrap each payload iteration in a try-except to ensure one failure doesn't stop the entire scan:

for payload in payloads:
    try:
        # Injection, send, receive, verify logic...
    except Exception as e:
        Logger.error(f"<Vulnerability> test error: {e}")
        continue  # Continue with next payload

Sources: wshawk/scanner_v2.py:206-208, wshawk/scanner_v2.py:287-288

Connection State Management

Check connection state before sending and handle disconnections:

try:
    await ws.send(msg)
except websockets.exceptions.ConnectionClosed:
    Logger.error("Connection closed, attempting reconnect...")
    ws = await self.connect()
    if not ws:
        break

Sources: wshawk/scanner_v2.py:77-85


CVSS Scoring

Calculating Scores

Implement CVSS v3.1 scoring based on vulnerability characteristics. Use the CVSS calculator or implement scoring logic:

def calculate_cvss_score(self, vuln_type: str, confidence: str) -> float:
    """
    Calculate CVSS v3.1 score for vulnerability
    """
    base_scores = {
        'SQL Injection': 9.0,
        'Command Injection': 9.8,
        'XSS': 6.1,
        'Path Traversal': 7.5,
        'XXE': 8.2,
        # Add your vulnerability type
    }
    
    score = base_scores.get(vuln_type, 5.0)
    
    # Adjust based on confidence
    if confidence == 'CRITICAL':
        return score
    elif confidence == 'HIGH':
        return score * 0.9
    elif confidence == 'MEDIUM':
        return score * 0.7
    else:
        return score * 0.5

Score Assignment

Assign scores when appending vulnerabilities:

cvss_score = self.calculate_cvss_score('<Vulnerability Type>', confidence.value)

self.vulnerabilities.append({
    'type': '<Vulnerability Type>',
    'severity': confidence.value,
    'confidence': confidence.value,
    'description': description,
    'payload': payload,
    'response_snippet': response[:200],
    'recommendation': 'Remediation advice',
    'cvss_score': cvss_score
})

Sources: wshawk/scanner_v2.py:608


Complete Implementation Example

Here's a complete example showing all components integrated for a hypothetical LDAP injection detector:

flowchart TB
    Method["test_ldap_injection_v2(ws)"]
    
    Init["Logger.info('Testing LDAP injection...')<br/>results = []"]
    LoadPay["payloads = WSPayloads.get_ldap_injection()[:50]"]
    Base["base_message = self.sample_messages[0]"]
    
    Loop["for payload in payloads"]
    TryBlock["try block"]
    Inject["injected_messages = self.message_analyzer<br/>.inject_payload_into_message(base, payload)"]
    
    InnerLoop["for msg in injected_messages"]
    Send["await ws.send(msg)<br/>self.messages_sent += 1"]
    Recv["response = await asyncio.wait_for(<br/>ws.recv(), timeout=2.0)<br/>self.messages_received += 1"]
    
    Verify["is_vuln, confidence, desc =<br/>self.verifier.verify_ldap_injection(<br/>response, payload)"]
    
    CheckConf{"confidence !=<br/>LOW"}
    
    LogVuln["Logger.vuln(f'LDAP Injection [{confidence}]: {desc}')<br/>Logger.vuln(f'Payload: {payload[:80]}')"]
    
    CalcCVSS["cvss_score = calculate_cvss_score(<br/>'LDAP Injection', confidence.value)"]
    
    Append["self.vulnerabilities.append({<br/>'type': 'LDAP Injection',<br/>'severity': confidence.value,<br/>'cvss_score': cvss_score,<br/>...})"]
    
    AppendRes["results.append({<br/>'payload': payload,<br/>'confidence': confidence.value})"]
    
    Sleep["await asyncio.sleep(0.05)"]
    
    Timeout["except asyncio.TimeoutError: pass"]
    Exception["except Exception as e: continue"]
    
    Return["return results"]
    
    Method --> Init
    Init --> LoadPay
    LoadPay --> Base
    Base --> Loop
    
    Loop --> TryBlock
    TryBlock --> Inject
    Inject --> InnerLoop
    
    InnerLoop --> Send
    Send --> Recv
    Recv --> Verify
    Recv -->|"TimeoutError"| Timeout
    
    Verify --> CheckConf
    CheckConf -->|"Yes"| LogVuln
    LogVuln --> CalcCVSS
    CalcCVSS --> Append
    Append --> AppendRes
    AppendRes --> Sleep
    
    CheckConf -->|"No"| Sleep
    Timeout --> Sleep
    Sleep --> InnerLoop
    
    InnerLoop -->|"Done"| Loop
    Inject -->|"Exception"| Exception
    Exception --> Loop
    
    Loop -->|"Done"| Return

Verification Method in VulnerabilityVerifier

def verify_ldap_injection(self, response: str, payload: str) -> Tuple[bool, ConfidenceLevel, str]:
    """
    Verify LDAP injection vulnerability
    """
    response_lower = response.lower()
    
    # Critical indicators - strong evidence
    critical_patterns = [
        'ldap_operations_error',
        'ldap syntax error',
        'invalid dn syntax',
        'objectclass violation'
    ]
    
    # High confidence indicators
    high_patterns = [
        'ldap error',
        'directory server',
        'invalid filter',
        'search failed'
    ]
    
    # Check for critical indicators
    for pattern in critical_patterns:
        if pattern in response_lower:
            return True, ConfidenceLevel.HIGH, f"LDAP error: {pattern}"
    
    # Check for high confidence indicators
    for pattern in high_patterns:
        if pattern in response_lower:
            return True, ConfidenceLevel.MEDIUM, f"Potential LDAP issue: {pattern}"
    
    # Check for payload reflection (low confidence)
    if payload in response:
        return True, ConfidenceLevel.LOW, "Payload reflected, manual verification needed"
    
    return False, ConfidenceLevel.LOW, "Not vulnerable"

Integration with run_heuristic_scan

async def run_heuristic_scan(self):
    # ... existing setup ...
    
    # Existing tests
    await self.test_sql_injection_v2(ws)
    print()
    
    # Add new LDAP test
    await self.test_ldap_injection_v2(ws)
    print()
    
    # ... remaining tests ...

Sources: wshawk/scanner_v2.py:143-210, wshawk/scanner_v2.py:542-584


Testing Your Module

Unit Testing

Add tests to the test suite (see Testing Infrastructure):

def test_ldap_injection_detection():
    """Test LDAP injection detector"""
    verifier = VulnerabilityVerifier()
    
    # Test critical indicator
    response_critical = '{"error": "LDAP_OPERATIONS_ERROR: invalid DN"}'
    is_vuln, conf, desc = verifier.verify_ldap_injection(response_critical, "test")
    assert is_vuln == True
    assert conf == ConfidenceLevel.HIGH
    
    # Test medium confidence
    response_medium = '{"error": "LDAP error occurred"}'
    is_vuln, conf, desc = verifier.verify_ldap_injection(response_medium, "test")
    assert is_vuln == True
    assert conf == ConfidenceLevel.MEDIUM
    
    # Test no vulnerability
    response_clean = '{"status": "success"}'
    is_vuln, conf, desc = verifier.verify_ldap_injection(response_clean, "test")
    assert is_vuln == False

Sources: CONTRIBUTING.md:50-60

Integration Testing

Test against a vulnerable WebSocket endpoint:

# Run full scan with your new module
python -m wshawk ws://vulnerable-app:8080/ws

Verify:

  1. Module executes without errors
  2. Vulnerabilities are detected correctly
  3. False positives are minimal
  4. Report includes your findings
  5. CVSS scores are appropriate

Sources: CONTRIBUTING.md:54-60


Checklist for New Module

Before submitting your vulnerability detection module, ensure:

  • [ ] Method follows naming convention test_<type>_v2(self, ws) -> List[Dict]
  • [ ] Payloads loaded from WSPayloads or added to payload collections
  • [ ] Server fingerprinting leveraged for targeted payloads
  • [ ] Message analyzer used for intelligent injection (JSON/XML/Binary)
  • [ ] Rate limiting applied (asyncio.sleep() or rate_limiter.acquire())
  • [ ] Timeouts configured on all ws.recv() calls
  • [ ] Exception handling prevents crashes
  • [ ] Verification logic returns (bool, ConfidenceLevel, str)
  • [ ] Vulnerabilities stored in standard format with all required fields
  • [ ] CVSS scores calculated and assigned
  • [ ] Logging uses appropriate levels (Logger.info, Logger.vuln, Logger.error)
  • [ ] Statistics updated (self.messages_sent, self.messages_received)
  • [ ] Integrated into run_heuristic_scan()
  • [ ] Unit tests added
  • [ ] Documentation updated in code comments

Sources: CONTRIBUTING.md:94-100


Related Documentation

Sources: CONTRIBUTING.md:1-155, wshawk/scanner_v2.py:1-678