Adding Vulnerability Detection Modules
Adding Vulnerability Detection Modules
The following files were used as context for generating this wiki page:
Purpose and Scope
This page guides developers through the process of creating new vulnerability detection modules for WSHawk. It covers the standard detection pattern, integration points, verification logic, error handling, and CVSS scoring requirements.
For information about adding mutation strategies to evade WAFs, see Adding Mutation Strategies. For payload collection management, see Adding Payload Collections. For general development setup, see Development Environment Setup.
Vulnerability Detection Architecture
WSHawk's vulnerability detection system follows a modular architecture where the WSHawkV2 scanner orchestrates individual test methods that leverage specialized verification and analysis modules.
graph TB
Scanner["WSHawkV2 Scanner<br/>(scanner_v2.py)"]
subgraph "Test Methods"
SQL["test_sql_injection_v2()"]
XSS["test_xss_v2()"]
CMD["test_command_injection_v2()"]
XXE["test_xxe_v2()"]
NoSQL["test_nosql_injection_v2()"]
SSRF["test_ssrf_v2()"]
Path["test_path_traversal_v2()"]
end
subgraph "Analysis & Verification"
Verifier["VulnerabilityVerifier<br/>vulnerability_verifier.py"]
Analyzer["MessageAnalyzer<br/>message_intelligence.py"]
Fingerprint["ServerFingerprinter<br/>server_fingerprint.py"]
end
subgraph "Advanced Verification"
Browser["HeadlessBrowserXSSVerifier<br/>headless_xss_verifier.py"]
OAST["OASTProvider<br/>oast_provider.py"]
end
subgraph "Data Sources"
Payloads["WSPayloads<br/>__main__.py"]
end
Scanner --> SQL
Scanner --> XSS
Scanner --> CMD
Scanner --> XXE
Scanner --> NoSQL
Scanner --> SSRF
Scanner --> Path
SQL --> Verifier
XSS --> Verifier
CMD --> Verifier
Path --> Verifier
SQL --> Analyzer
XSS --> Analyzer
CMD --> Analyzer
SQL --> Fingerprint
CMD --> Fingerprint
XSS --> Browser
XXE --> OAST
SSRF --> OAST
SQL --> Payloads
XSS --> Payloads
CMD --> Payloads
XXE --> Payloads
NoSQL --> Payloads
Path --> Payloads
Sources: wshawk/scanner_v2.py:1-678
Standard Vulnerability Detection Pattern
All vulnerability detection methods in WSHawk follow a consistent pattern to ensure maintainability and reliability.
Method Signature Pattern
async def test_<vulnerability_type>_v2(self, ws) -> List[Dict]
- Method name:
test_<type>_v2where<type>describes the vulnerability - Parameters:
ws(WebSocket connection object) - Returns:
List[Dict]containing results with payload and confidence level
Detection Flow
flowchart TD
Start["test_*_v2(ws) called"]
Log["Log test initiation<br/>Logger.info()"]
LoadPayloads["Load payloads<br/>WSPayloads.get_*()"]
CheckFingerprint["Check ServerFingerprinter<br/>for targeted payloads"]
GetBase["Get base_message from<br/>self.sample_messages"]
LoopStart["For each payload"]
Inject["Inject payload using<br/>MessageAnalyzer.inject_payload_into_message()"]
RateLimit["Apply rate limiting<br/>await asyncio.sleep(0.05)"]
Send["Send via ws.send(msg)<br/>Increment self.messages_sent"]
Receive["Receive via ws.recv()<br/>with timeout=2.0"]
Verify["Verify using<br/>VulnerabilityVerifier.verify_*()"]
CheckConfidence{"confidence !=<br/>ConfidenceLevel.LOW"}
LogVuln["Logger.vuln() with details"]
AppendVuln["Append to self.vulnerabilities"]
AppendResult["Append to results list"]
HandleTimeout["Handle asyncio.TimeoutError"]
HandleError["Handle general Exception"]
LoopEnd{"More payloads?"}
Return["Return results"]
Start --> Log
Log --> LoadPayloads
LoadPayloads --> CheckFingerprint
CheckFingerprint --> GetBase
GetBase --> LoopStart
LoopStart --> Inject
Inject --> RateLimit
RateLimit --> Send
Send --> Receive
Receive --> Verify
Receive -->|"TimeoutError"| HandleTimeout
Verify --> CheckConfidence
CheckConfidence -->|"Yes"| LogVuln
LogVuln --> AppendVuln
AppendVuln --> AppendResult
AppendResult --> LoopEnd
CheckConfidence -->|"No"| LoopEnd
HandleTimeout --> LoopEnd
Inject -->|"Exception"| HandleError
HandleError --> LoopEnd
LoopEnd -->|"Yes"| LoopStart
LoopEnd -->|"No"| Return
Sources: wshawk/scanner_v2.py:143-210, wshawk/scanner_v2.py:212-290, wshawk/scanner_v2.py:292-356
Core Components
Vulnerability Data Structure
Each detected vulnerability must follow this standard structure:
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| type | str | Yes | Human-readable vulnerability type (e.g., "SQL Injection") |
| severity | str | Yes | Severity level: "CRITICAL", "HIGH", "MEDIUM", "LOW" |
| confidence | str | Yes | Confidence level: "CRITICAL", "HIGH", "MEDIUM", "LOW" |
| description | str | Yes | Detailed description of what was detected |
| payload | str | Yes | The payload that triggered the vulnerability |
| response_snippet | str | Yes | First 200 characters of the server response |
| recommendation | str | Yes | Remediation guidance |
| cvss_score | float | Optional | CVSS v3.1 score (0.0-10.0) |
| browser_verified | bool | Optional | Whether browser verification succeeded (XSS) |
| Additional fields | Various | Optional | Module-specific metadata |
Sources: wshawk/scanner_v2.py:190-198, wshawk/scanner_v2.py:270-279
ConfidenceLevel Enum
The VulnerabilityVerifier class uses a ConfidenceLevel enum to rate detection confidence:
- CRITICAL: Verified execution (e.g., browser-executed XSS)
- HIGH: Strong indicators with minimal false positive risk
- MEDIUM: Multiple indicators present, but needs manual verification
- LOW: Weak indicators, likely false positive
Sources: wshawk/scanner_v2.py:16-17
Step-by-Step: Creating a New Vulnerability Detector
Step 1: Add Test Method to WSHawkV2
Create a new async method in the WSHawkV2 class following the naming convention test_<vulnerability_type>_v2:
async def test_<vulnerability_type>_v2(self, ws) -> List[Dict]:
"""
Enhanced <vulnerability type> testing with automated verification
"""
Logger.info("Testing <vulnerability type>...")
results = []
# Implementation follows...
return results
Location: Add method to wshawk/scanner_v2.py:28-677 in the WSHawkV2 class.
Sources: wshawk/scanner_v2.py:28-677
Step 2: Load Payloads
Load vulnerability-specific payloads from the WSPayloads class:
payloads = WSPayloads.get_<vulnerability_type>()[:100] # Limit for performance
The WSPayloads class provides static methods for each vulnerability type. If adding a new type, you must also add a corresponding method to WSPayloads (see Adding Payload Collections).
Sources: wshawk/scanner_v2.py:26, wshawk/scanner_v2.py:150
Step 3: Server Fingerprinting Integration
Leverage server fingerprinting to use targeted payloads:
fingerprint = self.fingerprinter.fingerprint()
if fingerprint.database: # or .language, .framework
recommended = self.fingerprinter.get_recommended_payloads(fingerprint)
if recommended.get('<vulnerability_type>'):
Logger.info(f"Using {fingerprint.database}-specific payloads")
payloads = recommended['<vulnerability_type>'] + payloads[:50]
Sources: wshawk/scanner_v2.py:153-158, wshawk/scanner_v2.py:302-307
Step 4: Payload Injection
Use the learning phase results to intelligently inject payloads:
base_message = self.sample_messages[0] if self.sample_messages else '{"test": "value"}'
for payload in payloads:
try:
# Automated injection into message structure
if self.learning_complete and self.message_analyzer.detected_format == MessageFormat.JSON:
injected_messages = self.message_analyzer.inject_payload_into_message(
base_message, payload
)
else:
injected_messages = [payload]
for msg in injected_messages:
# Send and verify each injected message
This leverages the MessageAnalyzer to inject payloads into appropriate fields of JSON/XML messages detected during the learning phase.
Sources: wshawk/scanner_v2.py:161-173, wshawk/scanner_v2.py:15
Step 5: Send and Receive
Implement rate-limited sending and receiving with proper error handling:
for msg in injected_messages:
await ws.send(msg)
self.messages_sent += 1
try:
response = await asyncio.wait_for(ws.recv(), timeout=2.0)
self.messages_received += 1
# Verification logic here...
except asyncio.TimeoutError:
pass # Expected for some payloads
await asyncio.sleep(0.05) # Rate limiting
Key points:
- Increment
self.messages_sentandself.messages_receivedfor statistics - Use
asyncio.wait_for()with timeout to prevent hangs - Handle
asyncio.TimeoutErrorgracefully - Apply rate limiting with
asyncio.sleep()(or useself.rate_limiter.acquire())
Sources: wshawk/scanner_v2.py:174-204, wshawk/scanner_v2.py:19
Step 6: Verification Logic
Add verification logic to VulnerabilityVerifier or implement inline verification:
Option A: Add to VulnerabilityVerifier
Create a method in the VulnerabilityVerifier class:
def verify_<vulnerability_type>(self, response: str, payload: str) -> Tuple[bool, ConfidenceLevel, str]:
"""
Verify <vulnerability type> based on response heuristics
Returns:
(is_vulnerable, confidence_level, description)
"""
# Heuristic checks
if strong_indicator in response:
return True, ConfidenceLevel.HIGH, "Description"
elif weak_indicator in response:
return True, ConfidenceLevel.MEDIUM, "Description"
return False, ConfidenceLevel.LOW, "Not vulnerable"
Then call it in your test method:
is_vuln, confidence, description = self.verifier.verify_<vulnerability_type>(
response, payload
)
Sources: wshawk/scanner_v2.py:16, wshawk/scanner_v2.py:182-185
Option B: Inline Verification
For simpler checks, implement verification directly in the test method:
vulnerability_indicators = ['error_msg_1', 'error_msg_2', 'specific_pattern']
if any(ind.lower() in response.lower() for ind in vulnerability_indicators):
Logger.vuln(f"<Vulnerability Type> [HIGH]: Description")
# Append to self.vulnerabilities
Sources: wshawk/scanner_v2.py:432-444, wshawk/scanner_v2.py:472-484
Step 7: Record Vulnerabilities
When a vulnerability is detected with sufficient confidence:
if is_vuln and confidence != ConfidenceLevel.LOW:
Logger.vuln(f"<Vulnerability Type> [{confidence.value}]: {description}")
Logger.vuln(f"Payload: {payload[:80]}")
self.vulnerabilities.append({
'type': '<Vulnerability Type Full Name>',
'severity': confidence.value,
'confidence': confidence.value,
'description': description,
'payload': payload,
'response_snippet': response[:200],
'recommendation': 'Specific remediation advice',
'cvss_score': calculated_cvss_score # Optional
})
results.append({'payload': payload, 'confidence': confidence.value})
Sources: wshawk/scanner_v2.py:186-199, wshawk/scanner_v2.py:377-388
Step 8: Integration with Scanner
Add your test method to the run_heuristic_scan() method:
async def run_heuristic_scan(self):
# ... existing setup code ...
# Run ALL tests with heuristics and rate limiting
await self.test_sql_injection_v2(ws)
print()
await self.test_xss_v2(ws)
print()
# Add your new test here
await self.test_<vulnerability_type>_v2(ws)
print()
# ... remaining tests ...
Sources: wshawk/scanner_v2.py:542-584
Advanced Verification Integration
Headless Browser Verification (XSS)
For XSS vulnerabilities, integrate with the HeadlessBrowserXSSVerifier for real execution verification:
browser_verified = False
if confidence == ConfidenceLevel.HIGH and self.use_headless_browser:
try:
if not self.headless_verifier:
self.headless_verifier = HeadlessBrowserXSSVerifier()
await self.headless_verifier.start()
is_executed, evidence = await self.headless_verifier.verify_xss_execution(
response, payload
)
if is_executed:
browser_verified = True
confidence = ConfidenceLevel.CRITICAL
description = f"REAL EXECUTION: {evidence}"
except Exception as e:
Logger.error(f"Browser verification failed: {e}")
Sources: wshawk/scanner_v2.py:248-263, wshawk/scanner_v2.py:21
OAST Integration (XXE, SSRF, Blind RCE)
For blind vulnerabilities, integrate with the OASTProvider:
# Start OAST if enabled
if self.use_oast and not self.oast_provider:
try:
self.oast_provider = OASTProvider(use_interactsh=False, custom_server="localhost:8888")
await self.oast_provider.start()
Logger.info("OAST provider started for blind detection")
except Exception as e:
Logger.error(f"OAST start failed: {e}")
self.use_oast = False
# Generate OAST payload
if self.use_oast and self.oast_provider:
oast_payload = self.oast_provider.generate_payload('<vuln_type>', f'test{len(results)}')
msg = json.dumps({"action": "parse_xml", "xml": oast_payload})
else:
msg = json.dumps({"action": "parse_xml", "xml": payload})
After sending, poll for callbacks to confirm blind vulnerabilities.
Sources: wshawk/scanner_v2.py:407-423, wshawk/scanner_v2.py:22
Error Handling Best Practices
Timeout Handling
Always wrap ws.recv() in asyncio.wait_for() to prevent indefinite hangs:
try:
response = await asyncio.wait_for(ws.recv(), timeout=2.0)
self.messages_received += 1
# Process response...
except asyncio.TimeoutError:
pass # Some payloads intentionally cause timeouts
Adjust timeout based on vulnerability type:
- Standard tests:
timeout=2.0 - Slow operations (SSRF, file reads):
timeout=3.0 - Command injection with sleep:
timeout=5.0
Sources: wshawk/scanner_v2.py:177-202, wshawk/scanner_v2.py:516-534
Exception Handling
Wrap each payload iteration in a try-except to ensure one failure doesn't stop the entire scan:
for payload in payloads:
try:
# Injection, send, receive, verify logic...
except Exception as e:
Logger.error(f"<Vulnerability> test error: {e}")
continue # Continue with next payload
Sources: wshawk/scanner_v2.py:206-208, wshawk/scanner_v2.py:287-288
Connection State Management
Check connection state before sending and handle disconnections:
try:
await ws.send(msg)
except websockets.exceptions.ConnectionClosed:
Logger.error("Connection closed, attempting reconnect...")
ws = await self.connect()
if not ws:
break
Sources: wshawk/scanner_v2.py:77-85
CVSS Scoring
Calculating Scores
Implement CVSS v3.1 scoring based on vulnerability characteristics. Use the CVSS calculator or implement scoring logic:
def calculate_cvss_score(self, vuln_type: str, confidence: str) -> float:
"""
Calculate CVSS v3.1 score for vulnerability
"""
base_scores = {
'SQL Injection': 9.0,
'Command Injection': 9.8,
'XSS': 6.1,
'Path Traversal': 7.5,
'XXE': 8.2,
# Add your vulnerability type
}
score = base_scores.get(vuln_type, 5.0)
# Adjust based on confidence
if confidence == 'CRITICAL':
return score
elif confidence == 'HIGH':
return score * 0.9
elif confidence == 'MEDIUM':
return score * 0.7
else:
return score * 0.5
Score Assignment
Assign scores when appending vulnerabilities:
cvss_score = self.calculate_cvss_score('<Vulnerability Type>', confidence.value)
self.vulnerabilities.append({
'type': '<Vulnerability Type>',
'severity': confidence.value,
'confidence': confidence.value,
'description': description,
'payload': payload,
'response_snippet': response[:200],
'recommendation': 'Remediation advice',
'cvss_score': cvss_score
})
Sources: wshawk/scanner_v2.py:608
Complete Implementation Example
Here's a complete example showing all components integrated for a hypothetical LDAP injection detector:
flowchart TB
Method["test_ldap_injection_v2(ws)"]
Init["Logger.info('Testing LDAP injection...')<br/>results = []"]
LoadPay["payloads = WSPayloads.get_ldap_injection()[:50]"]
Base["base_message = self.sample_messages[0]"]
Loop["for payload in payloads"]
TryBlock["try block"]
Inject["injected_messages = self.message_analyzer<br/>.inject_payload_into_message(base, payload)"]
InnerLoop["for msg in injected_messages"]
Send["await ws.send(msg)<br/>self.messages_sent += 1"]
Recv["response = await asyncio.wait_for(<br/>ws.recv(), timeout=2.0)<br/>self.messages_received += 1"]
Verify["is_vuln, confidence, desc =<br/>self.verifier.verify_ldap_injection(<br/>response, payload)"]
CheckConf{"confidence !=<br/>LOW"}
LogVuln["Logger.vuln(f'LDAP Injection [{confidence}]: {desc}')<br/>Logger.vuln(f'Payload: {payload[:80]}')"]
CalcCVSS["cvss_score = calculate_cvss_score(<br/>'LDAP Injection', confidence.value)"]
Append["self.vulnerabilities.append({<br/>'type': 'LDAP Injection',<br/>'severity': confidence.value,<br/>'cvss_score': cvss_score,<br/>...})"]
AppendRes["results.append({<br/>'payload': payload,<br/>'confidence': confidence.value})"]
Sleep["await asyncio.sleep(0.05)"]
Timeout["except asyncio.TimeoutError: pass"]
Exception["except Exception as e: continue"]
Return["return results"]
Method --> Init
Init --> LoadPay
LoadPay --> Base
Base --> Loop
Loop --> TryBlock
TryBlock --> Inject
Inject --> InnerLoop
InnerLoop --> Send
Send --> Recv
Recv --> Verify
Recv -->|"TimeoutError"| Timeout
Verify --> CheckConf
CheckConf -->|"Yes"| LogVuln
LogVuln --> CalcCVSS
CalcCVSS --> Append
Append --> AppendRes
AppendRes --> Sleep
CheckConf -->|"No"| Sleep
Timeout --> Sleep
Sleep --> InnerLoop
InnerLoop -->|"Done"| Loop
Inject -->|"Exception"| Exception
Exception --> Loop
Loop -->|"Done"| Return
Verification Method in VulnerabilityVerifier
def verify_ldap_injection(self, response: str, payload: str) -> Tuple[bool, ConfidenceLevel, str]:
"""
Verify LDAP injection vulnerability
"""
response_lower = response.lower()
# Critical indicators - strong evidence
critical_patterns = [
'ldap_operations_error',
'ldap syntax error',
'invalid dn syntax',
'objectclass violation'
]
# High confidence indicators
high_patterns = [
'ldap error',
'directory server',
'invalid filter',
'search failed'
]
# Check for critical indicators
for pattern in critical_patterns:
if pattern in response_lower:
return True, ConfidenceLevel.HIGH, f"LDAP error: {pattern}"
# Check for high confidence indicators
for pattern in high_patterns:
if pattern in response_lower:
return True, ConfidenceLevel.MEDIUM, f"Potential LDAP issue: {pattern}"
# Check for payload reflection (low confidence)
if payload in response:
return True, ConfidenceLevel.LOW, "Payload reflected, manual verification needed"
return False, ConfidenceLevel.LOW, "Not vulnerable"
Integration with run_heuristic_scan
async def run_heuristic_scan(self):
# ... existing setup ...
# Existing tests
await self.test_sql_injection_v2(ws)
print()
# Add new LDAP test
await self.test_ldap_injection_v2(ws)
print()
# ... remaining tests ...
Sources: wshawk/scanner_v2.py:143-210, wshawk/scanner_v2.py:542-584
Testing Your Module
Unit Testing
Add tests to the test suite (see Testing Infrastructure):
def test_ldap_injection_detection():
"""Test LDAP injection detector"""
verifier = VulnerabilityVerifier()
# Test critical indicator
response_critical = '{"error": "LDAP_OPERATIONS_ERROR: invalid DN"}'
is_vuln, conf, desc = verifier.verify_ldap_injection(response_critical, "test")
assert is_vuln == True
assert conf == ConfidenceLevel.HIGH
# Test medium confidence
response_medium = '{"error": "LDAP error occurred"}'
is_vuln, conf, desc = verifier.verify_ldap_injection(response_medium, "test")
assert is_vuln == True
assert conf == ConfidenceLevel.MEDIUM
# Test no vulnerability
response_clean = '{"status": "success"}'
is_vuln, conf, desc = verifier.verify_ldap_injection(response_clean, "test")
assert is_vuln == False
Sources: CONTRIBUTING.md:50-60
Integration Testing
Test against a vulnerable WebSocket endpoint:
# Run full scan with your new module
python -m wshawk ws://vulnerable-app:8080/ws
Verify:
- Module executes without errors
- Vulnerabilities are detected correctly
- False positives are minimal
- Report includes your findings
- CVSS scores are appropriate
Sources: CONTRIBUTING.md:54-60
Checklist for New Module
Before submitting your vulnerability detection module, ensure:
- [ ] Method follows naming convention
test_<type>_v2(self, ws) -> List[Dict] - [ ] Payloads loaded from
WSPayloadsor added to payload collections - [ ] Server fingerprinting leveraged for targeted payloads
- [ ] Message analyzer used for intelligent injection (JSON/XML/Binary)
- [ ] Rate limiting applied (
asyncio.sleep()orrate_limiter.acquire()) - [ ] Timeouts configured on all
ws.recv()calls - [ ] Exception handling prevents crashes
- [ ] Verification logic returns
(bool, ConfidenceLevel, str) - [ ] Vulnerabilities stored in standard format with all required fields
- [ ] CVSS scores calculated and assigned
- [ ] Logging uses appropriate levels (
Logger.info,Logger.vuln,Logger.error) - [ ] Statistics updated (
self.messages_sent,self.messages_received) - [ ] Integrated into
run_heuristic_scan() - [ ] Unit tests added
- [ ] Documentation updated in code comments
Sources: CONTRIBUTING.md:94-100
Related Documentation
- Analysis and Verification Modules - Details on
MessageAnalyzer,VulnerabilityVerifier, andServerFingerprinter - Vulnerability Detection Modules - Complete reference of all 11 built-in vulnerability types
- Adding Mutation Strategies - WAF evasion techniques
- Adding Payload Collections - Payload file formats and organization
- CVSS Scoring System - Detailed CVSS v3.1 implementation guide