Adding Mutation Strategies
Adding Mutation Strategies
The following files were used as context for generating this wiki page:
- RELEASE_3.0.0.md
- RELEASE_SUMMARY.md
- docs/V3_COMPLETE_GUIDE.md
- requirements.txt
- wshawk/advanced_cli.py
- wshawk/scanner_v2.py
Purpose and Scope
This document provides technical guidance for implementing custom payload mutation strategies in WSHawk. Mutation strategies transform attack payloads to bypass WAF and input validation filters by applying encoding, obfuscation, and evasion techniques.
For setting up a development environment before implementing mutation strategies, see page 8.1. For understanding the overall project structure, see page 8.2. For adding vulnerability detection modules, see page 8.3.
Sources: CONTRIBUTING.md:102-108, examples/mutation_demo.py:1-67
Mutation Strategy Architecture
System Overview
WSHawk's mutation system consists of three primary components that work together to generate and adapt payload variations during scanning:
Mutation System Architecture:
graph TB
subgraph "Base Classes"
BaseMutator["BaseMutator<br/>(wshawk/mutators/base.py)"]
MutateMethod["mutate(payload: str) -> List[str]"]
end
subgraph "Concrete Mutators"
CaseMutator["CaseVariationMutator"]
EncodeMutator["EncodingMutator"]
CommentMutator["CommentInjectionMutator"]
WhitespaceMutator["WhitespaceMutator"]
CustomMutator["CustomMutator<br/>(your implementation)"]
end
subgraph "Orchestration Layer"
PayloadMutator["PayloadMutator<br/>(payload_mutator.py)"]
MutationStrategy["MutationStrategy enum"]
AdaptiveLogic["learn_from_response()<br/>get_recommended_strategy()"]
end
subgraph "Scanner Integration"
WSHawkV2["WSHawkV2<br/>(scanner_v2.py)"]
TestMethods["test_*_v2() methods"]
WAFDetector["WAFDetector"]
end
BaseMutator --> CaseMutator
BaseMutator --> EncodeMutator
BaseMutator --> CommentMutator
BaseMutator --> WhitespaceMutator
BaseMutator --> CustomMutator
CaseMutator --> PayloadMutator
EncodeMutator --> PayloadMutator
CommentMutator --> PayloadMutator
WhitespaceMutator --> PayloadMutator
CustomMutator --> PayloadMutator
PayloadMutator --> MutationStrategy
PayloadMutator --> AdaptiveLogic
PayloadMutator --> TestMethods
WAFDetector --> AdaptiveLogic
TestMethods --> WSHawkV2
Sources: CONTRIBUTING.md:102-108, examples/mutation_demo.py:8-26
Core Components
| Component | Location | Purpose |
|-----------|----------|---------|
| BaseMutator | wshawk/mutators/base.py | Abstract base class defining mutation interface |
| PayloadMutator | wshawk/payload_mutator.py | Orchestrator managing multiple mutators and adaptive learning |
| MutationStrategy | wshawk/payload_mutator.py | Enum defining available mutation techniques |
| create_default_mutators() | wshawk/mutators/__init__.py | Factory function registering all mutators |
Sources: CONTRIBUTING.md:102-108, examples/mutation_demo.py:8-11
BaseMutator Interface
Abstract Base Class
All mutation strategies inherit from BaseMutator and implement the mutate() method. This provides a consistent interface for the PayloadMutator orchestrator.
BaseMutator Contract:
# Conceptual interface - actual implementation in wshawk/mutators/base.py
class BaseMutator:
"""Base class for payload mutation strategies"""
def mutate(self, payload: str) -> List[str]:
"""
Transform a payload using mutation strategy
Args:
payload: Original attack vector
Returns:
List of mutated variants (typically 1-10 variants)
"""
raise NotImplementedError("Subclasses must implement mutate()")
Key Requirements:
| Requirement | Description |
|-------------|-------------|
| Return Type | Must return List[str] containing at least one variant |
| Input Preservation | Original payload should typically be included in results |
| Error Handling | Should handle malformed input gracefully |
| Performance | Should generate variants in <100ms for interactive scanning |
Sources: CONTRIBUTING.md:102-108
MutationStrategy Enum
Available Strategy Types
The MutationStrategy enum defines all registered mutation techniques. Each enum value corresponds to a specific bypass technique:
MutationStrategy Enum Values:
# From examples/mutation_demo.py
for strategy in MutationStrategy:
variants = mutator.mutate_payload(xss_payload, strategy, count=3)
Common Strategy Types:
| Strategy | Purpose | Example Transformation |
|----------|---------|------------------------|
| CASE_VARIATION | Bypass case-sensitive filters | SELECT → sElEcT |
| ENCODING | Evade character blacklists | <script> → %3Cscript%3E |
| COMMENT_INJECTION | Break up keyword patterns | UNION → UN/**/ION |
| WHITESPACE_MANIPULATION | Defeat whitespace normalization | ' OR → '/**/OR |
| UNICODE_NORMALIZATION | Bypass Unicode-aware filters | < → < (fullwidth) |
| NULL_BYTE_INJECTION | Truncate string validation | ' OR 1=1 → ' OR 1=1%00 |
| DOUBLE_ENCODING | Evade decode-once filters | % → %2525 |
| CONCATENATION | Break signature matching | SELECT → 'SEL'+'ECT' |
Sources: examples/mutation_demo.py:21-26
Implementing Custom Mutation Strategies
Step-by-Step Implementation
Step 1: Create mutator file in wshawk/mutators/ directory:
wshawk/mutators/custom_mutator.py
Step 2: Inherit from BaseMutator:
Step 1: Create mutator file in wshawk/mutators/ directory:
wshawk/mutators/custom_mutator.py
Step 2: Implement mutator class inheriting from BaseMutator:
# File: wshawk/mutators/custom_mutator.py
from typing import List
from .base import BaseMutator
class CustomMutator(BaseMutator):
"""
Custom mutation strategy description
This mutator implements [specific technique] to bypass [type of filter].
Examples:
Input: ' OR 1=1--
Output: [' oR 1=1--', ' OR 1=1--%00', ...]
"""
def mutate(self, payload: str) -> List[str]:
"""
Transform payload using custom mutation technique
Args:
payload: Original attack vector
Returns:
List of mutated payload variants
"""
variants = []
# Always include original payload
variants.append(payload)
# Apply your custom mutation logic
# Example: reverse character order
variants.append(payload[::-1])
# Example: duplicate characters
doubled = ''.join(c * 2 for c in payload)
variants.append(doubled)
return variants
Sources: CONTRIBUTING.md:102-108
Step 3: Register mutator in wshawk/mutators/__init__.py:
The create_default_mutators() function serves as the factory for instantiating all mutation strategies. Add your custom mutator to this list:
# File: wshawk/mutators/__init__.py
from .case_variation_mutator import CaseVariationMutator
from .encoding_mutator import EncodingMutator
from .comment_injection_mutator import CommentInjectionMutator
from .whitespace_mutator import WhitespaceMutator
from .custom_mutator import CustomMutator # Add import
def create_default_mutators():
"""
Factory function creating default mutator instances
Returns:
List of mutator instances for PayloadMutator
"""
return [
CaseVariationMutator(),
EncodingMutator(),
CommentInjectionMutator(),
WhitespaceMutator(),
CustomMutator(), # Add your mutator instance
]
Sources: CONTRIBUTING.md:102-108
Mutation Strategy Implementation Patterns
Common Techniques
Effective mutation strategies implement well-established bypass techniques targeting specific filter types:
Case Variation Pattern:
Bypasses case-sensitive keyword blacklists by alternating character case:
def mutate(self, payload: str) -> List[str]:
variants = [payload]
# Alternate case pattern
alternated = ''.join(
c.upper() if i % 2 == 0 else c.lower()
for i, c in enumerate(payload)
)
variants.append(alternated)
# Random case variation
import random
randomized = ''.join(
c.upper() if random.random() > 0.5 else c.lower()
for c in payload
)
variants.append(randomized)
return variants
Encoding Pattern:
Applies URL, hex, or other encoding schemes to evade character-based filters:
def mutate(self, payload: str) -> List[str]:
import urllib.parse
variants = [payload]
# URL encoding
variants.append(urllib.parse.quote(payload))
# Double URL encoding
variants.append(urllib.parse.quote(urllib.parse.quote(payload)))
# Hex encoding
hex_encoded = ''.join(f'\\x{ord(c):02x}' for c in payload)
variants.append(hex_encoded)
return variants
Comment Injection Pattern:
Inserts SQL/JavaScript comments to break keyword matching:
def mutate(self, payload: str) -> List[str]:
variants = [payload]
# Insert SQL comments
sql_commented = payload.replace(' ', '/**/')
variants.append(sql_commented)
# Insert multiple comment types
multi_commented = payload.replace(' ', '/**/--\n')
variants.append(multi_commented)
return variants
Sources: examples/mutation_demo.py:14-26
Mutation Strategy Examples
Reference table of common patterns and their effectiveness:
| Strategy | Input | Output Example | Target Filter Type |
|----------|-------|----------------|-------------------|
| Case Variation | SELECT * FROM | sElEcT * FrOm | Case-sensitive keyword blacklists |
| URL Encoding | <script> | %3Cscript%3E | Character blacklists |
| Hex Encoding | <script> | \x3Cscript\x3E | Pattern-based filters |
| Comment Injection | ' OR 1=1 | '/**/OR/**/1=1 | Whitespace-sensitive parsers |
| Unicode Normalization | <script> | <script> | ASCII-only filters |
| Null Byte Injection | ' OR 1=1 | ' OR 1=1%00 | String termination exploits |
| Double Encoding | %27 | %2527 | Decode-once filters |
| Concatenation | SELECT | 'SEL'+'ECT' | Signature-based detection |
Sources: examples/mutation_demo.py:14-26
PayloadMutator Orchestrator
Orchestration Layer
The PayloadMutator class manages multiple BaseMutator instances and provides high-level mutation APIs with adaptive learning capabilities.
PayloadMutator Class Diagram:
graph TB
subgraph "PayloadMutator Class"
PayloadMutator["PayloadMutator"]
MutatePayload["mutate_payload(payload, strategy, count)"]
GenerateAdaptive["generate_adaptive_payloads(payload, max_count)"]
LearnFromResponse["learn_from_response(payload, response, is_blocked)"]
GetRecommended["get_recommended_strategy()"]
end
subgraph "Mutator Registry"
MutatorList["self.mutators: List[BaseMutator]"]
StrategyMap["strategy -> mutator mapping"]
end
subgraph "Adaptive Learning"
StrategyStats["strategy_success_rate: Dict"]
BlockedStrategies["blocked_strategies: Set"]
RecommendationEngine["Strategy recommendation logic"]
end
subgraph "Scanner Usage"
TestMethods["test_*_v2() in WSHawkV2"]
MutatedPayloads["mutated_payloads = mutator.generate_adaptive_payloads()"]
FeedbackLoop["mutator.learn_from_response(payload, response, blocked)"]
end
PayloadMutator --> MutatePayload
PayloadMutator --> GenerateAdaptive
PayloadMutator --> LearnFromResponse
PayloadMutator --> GetRecommended
MutatePayload --> MutatorList
GenerateAdaptive --> StrategyMap
LearnFromResponse --> StrategyStats
GetRecommended --> RecommendationEngine
RecommendationEngine --> StrategyStats
RecommendationEngine --> BlockedStrategies
TestMethods --> MutatedPayloads
TestMethods --> FeedbackLoop
Sources: examples/mutation_demo.py:8-62
Key Methods
mutate_payload(payload, strategy, count):
Generates a specific number of variants using a single mutation strategy:
# From examples/mutation_demo.py
from wshawk.payload_mutator import PayloadMutator, MutationStrategy
mutator = PayloadMutator()
xss_payload = "<script>alert(document.cookie)</script>"
# Generate 3 variants using encoding strategy
variants = mutator.mutate_payload(xss_payload, MutationStrategy.ENCODING, count=3)
for v in variants:
print(v)
# Output:
# %3Cscript%3Ealert%28document.cookie%29%3C%2Fscript%3E
# \x3Cscript\x3Ealert\x28document.cookie\x29\x3C\x2Fscript\x3E
# ...
generate_adaptive_payloads(payload, max_count):
Automatically selects optimal mutation strategies based on historical success rates:
# From examples/mutation_demo.py
sqli_payload = "' OR 1=1--"
adaptive_variants = mutator.generate_adaptive_payloads(sqli_payload, max_count=10)
for i, variant in enumerate(adaptive_variants, 1):
print(f"{i:2d}. {variant}")
# Generates up to 10 variants using recommended strategies
learn_from_response(payload, response, is_blocked, response_time):
Updates internal strategy statistics based on server responses:
# From examples/mutation_demo.py
mutator.learn_from_response(
payload="<script>alert(1)</script>",
response="403 Forbidden - Blocked by Cloudflare",
is_blocked=True,
response_time=0.02,
)
# After learning, strategy recommendations adapt
recommended = mutator.get_recommended_strategy()
print(f"Recommended strategy: {recommended.value}")
Sources: examples/mutation_demo.py:14-62
Scanner Integration
Integration Points
Mutation strategies integrate with the WSHawkV2 scanner at multiple points during the scanning lifecycle:
Scanner-Mutator Integration Diagram:
graph TB
subgraph "Scanner Initialization"
ScannerInit["WSHawkV2.__init__()"]
CreateMutators["create_default_mutators()"]
PayloadMutatorInit["PayloadMutator(mutators)"]
end
subgraph "Test Execution Phase"
TestMethod["test_sqli_v2(ws)"]
LoadPayloads["payloads = WSPayloads.get_sql_injection()"]
GenerateMutations["mutated = mutator.generate_adaptive_payloads()"]
SendPayloads["await ws.send(mutated_payload)"]
end
subgraph "Response Processing"
ReceiveResponse["response = await ws.recv()"]
WAFCheck["waf_detector.detect_waf(response)"]
LearnFromResponse["mutator.learn_from_response(payload, response, blocked)"]
VerifyVuln["verifier.verify_sql_injection(response, payload)"]
end
subgraph "Adaptive Learning Loop"
UpdateStats["Update strategy success rates"]
GetRecommended["mutator.get_recommended_strategy()"]
NextIteration["Apply recommended strategy to next payload"]
end
ScannerInit --> CreateMutators
CreateMutators --> PayloadMutatorInit
PayloadMutatorInit --> TestMethod
TestMethod --> LoadPayloads
LoadPayloads --> GenerateMutations
GenerateMutations --> SendPayloads
SendPayloads --> ReceiveResponse
ReceiveResponse --> WAFCheck
WAFCheck --> LearnFromResponse
ReceiveResponse --> VerifyVuln
LearnFromResponse --> UpdateStats
UpdateStats --> GetRecommended
GetRecommended --> NextIteration
NextIteration --> GenerateMutations
Sources: examples/mutation_demo.py:29-62
Usage in Test Methods
Scanner test methods use PayloadMutator to generate evasive payload variants:
# Conceptual pattern from scanner_v2.py test methods
async def test_sqli_v2(self, ws):
"""SQL injection testing with adaptive mutations"""
# Load base payloads
payloads = WSPayloads.get_sql_injection()
for base_payload in payloads:
# Generate mutated variants
mutated_payloads = self.mutator.generate_adaptive_payloads(
base_payload,
max_count=5
)
for payload in mutated_payloads:
# Send payload
await ws.send(payload)
response = await ws.recv()
# Learn from response
is_blocked = self.waf_detector.detect_waf(response) is not None
self.mutator.learn_from_response(
payload=payload,
response=response,
is_blocked=is_blocked,
response_time=0.1
)
# Verify vulnerability
is_vuln, confidence, desc = self.verifier.verify_sql_injection(
response, payload
)
if is_vuln:
self.vulnerabilities.append({...})
Sources: examples/mutation_demo.py:40-62
Testing Mutation Strategies
Development Testing
Validate mutation strategy implementations before integration:
Step 1: Create standalone test script:
#!/usr/bin/env python3
"""Test custom mutation strategy"""
from wshawk.mutators.custom_mutator import CustomMutator
def test_custom_mutator():
mutator = CustomMutator()
# Test with various payload types
test_payloads = [
"' OR 1=1--",
"<script>alert(1)</script>",
"${7*7}",
"../../etc/passwd"
]
for payload in test_payloads:
variants = mutator.mutate(payload)
print(f"\nOriginal: {payload}")
print(f"Variants ({len(variants)}):")
for v in variants:
print(f" - {v}")
if __name__ == "__main__":
test_custom_mutator()
Step 2: Test within PayloadMutator:
from wshawk.payload_mutator import PayloadMutator
from wshawk.mutators import create_default_mutators
# Initialize with your mutator included
mutators = create_default_mutators()
mutator = PayloadMutator(mutators)
# Test adaptive generation
variants = mutator.generate_adaptive_payloads("' OR 1=1--", max_count=10)
print(f"Generated {len(variants)} adaptive variants")
Step 3: Run mutation demo example:
python examples/mutation_demo.py
This validates that your mutator integrates correctly with the orchestration layer.
Sources: examples/mutation_demo.py:1-67, CONTRIBUTING.md:52-61
Integration Testing
Test mutator behavior within full scanner context:
Step 1: Create test WebSocket server:
# Start test server (if available in tests/)
python tests/websocket_test_server.py
Step 2: Run scanner with mutation logging:
# Enable verbose logging to see mutation strategy selection
wshawk ws://localhost:8765 --verbose
Step 3: Verify mutation diversity in report:
Check wshawk_report_*.html for:
- Multiple payload variants tested
- Strategy information in vulnerability details
- Adaptive learning behavior (if logs show strategy switching)
Sources: CONTRIBUTING.md:52-61
Code Quality Standards
| Requirement | Description | Validation Method |
|-------------|-------------|-------------------|
| PEP 8 Compliance | Follow Python style guidelines | flake8 wshawk/mutators/custom_mutator.py |
| Type Hints | Annotate function signatures | def mutate(self, payload: str) -> List[str]: |
| Docstrings | Document class and methods | Include Examples section |
| Error Handling | Handle malformed input | Wrap string operations in try/except |
| Performance | Generate variants in <100ms | Test with time.time() measurements |
| Uniqueness | Return distinct variants | Use set() to deduplicate results |
Example Docstring Format:
class CustomMutator(BaseMutator):
"""
Custom mutation strategy implementing [technique name]
This mutator targets [specific filter type] by applying
[transformation description].
Examples:
>>> mutator = CustomMutator()
>>> mutator.mutate("' OR 1=1--")
["' OR 1=1--", "' oR 1=1--", "' OR 1=1--%00"]
Attributes:
None
"""
def mutate(self, payload: str) -> List[str]:
"""
Apply custom mutation to payload
Args:
payload: Original attack vector string
Returns:
List of mutated variants (minimum 1, typically 3-10)
Raises:
None (returns [payload] on error)
"""
Sources: CONTRIBUTING.md:43-49
Submission Process
Pull Request Checklist
Before submitting extensions:
- [ ] All tests pass (
python tests/test_modules_quick.py) - [ ] Code follows PEP 8 style guidelines
- [ ] Type hints added to new functions
- [ ] Docstrings added to all classes/methods
- [ ] Error handling implemented for network operations
- [ ] Integration point updated (scanner, mutator registry, or
WSPayloads) - [ ] Example usage documented in docstrings
- [ ] CVSS scoring included for vulnerability modules
- [ ] Rate limiting respected in test methods
Documentation Requirements
Include in pull request description:
- Purpose: What vulnerability/technique does this extension detect/implement?
- Integration: How does it integrate with existing scanner components?
- Testing: What testing was performed? Include example output.
- Dependencies: Any new dependencies required?
- Breaking Changes: Does this modify existing interfaces?
Sources: CONTRIBUTING.md:85-91
Review Process
Extension submissions undergo:
- Automated CI/CD Checks - Build verification and test execution
- Code Review - Maintainer evaluation of implementation quality
- Security Review - Assessment of potential security implications
- Integration Testing - Verification of scanner compatibility
Expect 3-7 days for initial review feedback.
Sources: CONTRIBUTING.md:70-91, Diagram 6 from architecture overview