Python API
Python API
The following files were used as context for generating this wiki page:
- .github/workflows/ghcr-publish.yml
- README.md
- requirements.txt
- wshawk/init.py
- wshawk/advanced_cli.py
- wshawk/scanner_v2.py
This document covers programmatic usage of WSHawk through its Python API. It focuses on the WSHawkV2 class for offensive scanning, the DefensiveValidator class for blue team validation, and the PayloadMutator class for custom payload generation. For CLI-based usage, see Quick Start Examples. For detailed scanner architecture, see WSHawkV2 Scanner Engine. For configuration files and authentication sequences, see Configuration and Authentication.
API Entry Points
WSHawk provides three primary Python classes for programmatic integration:
graph TB
subgraph "Public API Classes"
WSHawkV2["WSHawkV2<br/>wshawk.scanner_v2"]
DefensiveValidator["DefensiveValidator<br/>wshawk.defensive_validation"]
PayloadMutator["PayloadMutator<br/>wshawk.payload_mutator"]
end
subgraph "Core Modules (Auto-initialized)"
MessageAnalyzer["MessageAnalyzer<br/>message_intelligence.py"]
VulnerabilityVerifier["VulnerabilityVerifier<br/>vulnerability_verifier.py"]
ServerFingerprinter["ServerFingerprinter<br/>server_fingerprint.py"]
SessionStateMachine["SessionStateMachine<br/>state_machine.py"]
TokenBucketRateLimiter["TokenBucketRateLimiter<br/>rate_limiter.py"]
EnhancedHTMLReporter["EnhancedHTMLReporter<br/>enhanced_reporter.py"]
end
subgraph "Optional Components"
HeadlessBrowserXSSVerifier["HeadlessBrowserXSSVerifier<br/>headless_xss_verifier.py"]
OASTProvider["OASTProvider<br/>oast_provider.py"]
SessionHijackingTester["SessionHijackingTester<br/>session_hijacking_tester.py"]
end
WSHawkV2 --> MessageAnalyzer
WSHawkV2 --> VulnerabilityVerifier
WSHawkV2 --> ServerFingerprinter
WSHawkV2 --> SessionStateMachine
WSHawkV2 --> TokenBucketRateLimiter
WSHawkV2 --> EnhancedHTMLReporter
WSHawkV2 -.optional.-> HeadlessBrowserXSSVerifier
WSHawkV2 -.optional.-> OASTProvider
WSHawkV2 -.optional.-> SessionHijackingTester
DefensiveValidator --> MessageAnalyzer
PayloadMutator -.standalone.-> WSHawkV2
Sources: wshawk/scanner_v2.py:1-77, examples/basic_scan.py:1-41, examples/defensive_check.py:1-43, examples/mutation_demo.py:1-67
WSHawkV2 Class
The WSHawkV2 class is the primary entry point for offensive security scanning. It orchestrates all vulnerability detection modules and provides comprehensive WebSocket security testing.
Basic Initialization
from wshawk.scanner_v2 import WSHawkV2
# Minimal initialization
scanner = WSHawkV2("ws://target.com")
# With custom headers
scanner = WSHawkV2(
"ws://target.com",
headers={"Authorization": "Bearer token123"}
)
# With rate limiting
scanner = WSHawkV2(
"ws://target.com",
max_rps=5 # 5 requests per second
)
# With authentication sequence
scanner = WSHawkV2(
"ws://target.com",
auth_sequence="path/to/auth.yaml"
)
Constructor Parameters:
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| url | str | Required | WebSocket target URL (ws:// or wss://) |
| headers | Dict[str, str] | None | Custom HTTP headers for WebSocket handshake |
| auth_sequence | str | None | Path to YAML file with authentication message sequence |
| max_rps | int | 10 | Maximum requests per second for rate limiting |
Sources: wshawk/scanner_v2.py:28-76, README.md:209-223, examples/basic_scan.py:13-22
Configuring Scanner Behavior
Feature Flags
The scanner supports optional features that can be enabled or disabled before running:
scanner = WSHawkV2("ws://target.com")
# Enable/disable Playwright browser verification
scanner.use_headless_browser = True # Default: True
# Enable/disable OAST blind vulnerability detection
scanner.use_oast = True # Default: True
Feature Flag Reference:
| Property | Type | Default | Purpose |
|----------|------|---------|---------|
| use_headless_browser | bool | True | Enables real browser XSS verification via Playwright |
| use_oast | bool | True | Enables Out-of-Band testing for blind XXE/SSRF |
Sources: wshawk/scanner_v2.py:52-58, examples/basic_scan.py:19-21
Accessing Internal Components
All core modules are accessible as public attributes after initialization:
scanner = WSHawkV2("ws://target.com")
# Access message analyzer for custom logic
analyzer = scanner.message_analyzer
# Access verifier for custom verification
verifier = scanner.verifier
# Access rate limiter stats
rate_stats = scanner.rate_limiter.get_stats()
# Access fingerprint data
fingerprint_info = scanner.fingerprinter.get_info()
Available Attributes:
| Attribute | Type | Description |
|-----------|------|-------------|
| message_analyzer | MessageAnalyzer | Detects message formats (JSON/XML/Binary) and injectable fields |
| verifier | VulnerabilityVerifier | Automated vulnerability verification with confidence levels |
| fingerprinter | ServerFingerprinter | Server technology detection (language, framework, database) |
| state_machine | SessionStateMachine | WebSocket connection state tracking |
| rate_limiter | TokenBucketRateLimiter | Adaptive rate limiting controller |
| reporter | EnhancedHTMLReporter | HTML report generator |
Sources: wshawk/scanner_v2.py:40-76
Running Scans
Complete Heuristic Scan
The run_heuristic_scan() method executes all 11 vulnerability tests in sequence:
import asyncio
from wshawk.scanner_v2 import WSHawkV2
async def scan():
scanner = WSHawkV2("ws://target.com")
# Runs: SQLi, XSS, Command Injection, Path Traversal,
# XXE, NoSQL, SSRF, and Session Hijacking tests
vulnerabilities = await scanner.run_heuristic_scan()
return vulnerabilities
# Execute
results = asyncio.run(scan())
Scan Lifecycle:
stateDiagram-v2
[*] --> Initialize
Initialize --> Connect: scanner.run_heuristic_scan()
Connect --> LearningPhase: await connect()
LearningPhase --> MessageAnalysis: Observe 5s of traffic
MessageAnalysis --> Fingerprinting: Detect format/fields
Fingerprinting --> VulnerabilityTests: Identify tech stack
VulnerabilityTests --> SQLInjection
SQLInjection --> XSS
XSS --> CommandInjection
CommandInjection --> PathTraversal
PathTraversal --> XXE
XXE --> NoSQL
NoSQL --> SSRF
SSRF --> SessionTests
SessionTests --> Cleanup
Cleanup --> ReportGeneration
ReportGeneration --> [*]: Return vulnerabilities[]
Sources: wshawk/scanner_v2.py:542-677, examples/basic_scan.py:25-26
Individual Test Methods
Each vulnerability type has a dedicated async method that can be called independently:
async def custom_scan():
scanner = WSHawkV2("ws://target.com")
# Connect manually
ws = await scanner.connect()
if not ws:
return
# Run learning phase
await scanner.learning_phase(ws, duration=5)
# Run specific tests
sqli_results = await scanner.test_sql_injection_v2(ws)
xss_results = await scanner.test_xss_v2(ws)
# Close connection
await ws.close()
return scanner.vulnerabilities
Available Test Methods:
| Method | Line Reference | Vulnerabilities Detected |
|--------|---------------|--------------------------|
| test_sql_injection_v2(ws) | scanner_v2.py:143-210 | SQL Injection with database-specific payloads |
| test_xss_v2(ws) | scanner_v2.py:212-290 | Cross-Site Scripting with context analysis |
| test_command_injection_v2(ws) | scanner_v2.py:292-356 | OS Command Injection with timing attacks |
| test_path_traversal_v2(ws) | scanner_v2.py:358-397 | Directory Traversal / LFI |
| test_xxe_v2(ws) | scanner_v2.py:399-453 | XML External Entity with OAST |
| test_nosql_injection_v2(ws) | scanner_v2.py:455-493 | NoSQL Injection (MongoDB, etc.) |
| test_ssrf_v2(ws) | scanner_v2.py:495-540 | Server-Side Request Forgery |
Sources: wshawk/scanner_v2.py:143-540
Working with Results
Vulnerability Data Structure
All detected vulnerabilities are appended to scanner.vulnerabilities as dictionaries:
vulnerabilities = await scanner.run_heuristic_scan()
for vuln in vulnerabilities:
print(f"Type: {vuln['type']}")
print(f"Severity: {vuln['severity']}")
print(f"Confidence: {vuln['confidence']}")
print(f"Description: {vuln['description']}")
print(f"Payload: {vuln['payload']}")
print(f"Response: {vuln['response_snippet']}")
print(f"Fix: {vuln['recommendation']}")
print()
Vulnerability Dictionary Schema:
| Key | Type | Description |
|-----|------|-------------|
| type | str | Vulnerability category (e.g., "SQL Injection", "XSS") |
| severity | str | Risk level: "CRITICAL", "HIGH", "MEDIUM", "LOW" |
| confidence | str | Detection confidence: "CRITICAL", "HIGH", "MEDIUM", "LOW" |
| description | str | Human-readable explanation of the finding |
| payload | str | Attack payload that triggered the vulnerability |
| response_snippet | str | First 200 characters of server response |
| recommendation | str | Remediation guidance |
| browser_verified | bool | (XSS only) Whether Playwright confirmed execution |
| cvss_score | float | (Session tests only) CVSS v3.1 score |
Sources: wshawk/scanner_v2.py:190-199, wshawk/scanner_v2.py:270-280, examples/basic_scan.py:32-36
Statistics and Metrics
Access scan metrics through scanner attributes:
await scanner.run_heuristic_scan()
# Request/response counts
print(f"Sent: {scanner.messages_sent}")
print(f"Received: {scanner.messages_received}")
# Timing
duration = (scanner.end_time - scanner.start_time).total_seconds()
print(f"Duration: {duration:.2f}s")
# Rate limiter stats
stats = scanner.rate_limiter.get_stats()
print(f"Total requests: {stats['total_requests']}")
print(f"Wait events: {stats['total_waits']}")
print(f"Current rate: {stats['current_rate']} req/s")
print(f"Adaptive adjustments: {stats['adaptive_adjustments']}")
Sources: wshawk/scanner_v2.py:65-75, wshawk/scanner_v2.py:631-675
Advanced Features
Playwright XSS Verification
Enable browser-based XSS verification to eliminate false positives:
scanner = WSHawkV2("ws://target.com")
scanner.use_headless_browser = True # Default is True
# The scanner automatically initializes Playwright when needed
vulnerabilities = await scanner.run_heuristic_scan()
# Check which XSS findings were browser-verified
for vuln in vulnerabilities:
if vuln['type'] == 'Cross-Site Scripting (XSS)':
if vuln.get('browser_verified'):
print(f"REAL XSS: {vuln['payload']}")
How Browser Verification Works:
sequenceDiagram
participant Scanner as scanner.test_xss_v2()
participant Verifier as VulnerabilityVerifier
participant Browser as HeadlessBrowserXSSVerifier
participant Playwright as Playwright API
Scanner->>Verifier: verify_xss(response, payload)
Verifier->>Scanner: confidence=HIGH (pattern matched)
Scanner->>Browser: verify_xss_execution(response, payload)
Browser->>Playwright: page.goto(html_with_payload)
Playwright->>Playwright: Wait for console.log/alert
Playwright->>Browser: Event captured
Browser->>Scanner: is_executed=True, evidence=...
Scanner->>Scanner: Set confidence=CRITICAL<br/>browser_verified=True
Sources: wshawk/scanner_v2.py:248-269, wshawk/scanner_v2.py:52-54
OAST Blind Vulnerability Detection
Enable Out-of-Band Application Security Testing for blind XXE and SSRF:
scanner = WSHawkV2("ws://target.com")
scanner.use_oast = True # Default is True
# OAST provider automatically registers with interact.sh
vulnerabilities = await scanner.run_heuristic_scan()
# The scanner polls for callbacks after sending payloads
# Blind vulnerabilities are detected via DNS/HTTP interactions
OAST Integration Flow:
sequenceDiagram
participant Scanner as scanner.test_xxe_v2()
participant OAST as OASTProvider
participant InteractSH as interact.sh
participant Target as WebSocket Server
Scanner->>OAST: start()
OAST->>InteractSH: Register unique domain
InteractSH->>OAST: xxxx.oast.fun
Scanner->>OAST: generate_payload('xxe', 'test1')
OAST->>Scanner: <!ENTITY xxe SYSTEM "http://test1.xxxx.oast.fun">
Scanner->>Target: Send XXE payload
Target->>InteractSH: DNS query for test1.xxxx.oast.fun
Scanner->>OAST: poll_interactions()
OAST->>InteractSH: Check for callbacks
InteractSH->>OAST: [DNS query from X.X.X.X]
OAST->>Scanner: Blind XXE confirmed!
Sources: wshawk/scanner_v2.py:56-58, wshawk/scanner_v2.py:406-451
Session Hijacking Tests
Session security tests run automatically at the end of run_heuristic_scan():
# Manual session testing
from wshawk.session_hijacking_tester import SessionHijackingTester
async def test_sessions():
tester = SessionHijackingTester("ws://target.com")
# Run all 6 session security tests
results = await tester.run_all_tests()
for result in results:
if result.is_vulnerable:
print(f"[{result.confidence}] {result.vuln_type.value}")
print(f" {result.description}")
print(f" CVSS: {result.cvss_score}")
print(f" Fix: {result.recommendation}")
Sources: wshawk/scanner_v2.py:589-613
Defensive Validation API
The DefensiveValidator class provides blue team security control validation:
from wshawk.defensive_validation import DefensiveValidator
async def validate_defenses():
validator = DefensiveValidator("ws://your-server.com")
# Run all 4 defensive checks
results = await validator.run_all_checks()
# Process results
for check in results:
status = "PASS" if check["passed"] else "FAIL"
print(f"[{status}] {check['name']}")
print(f" Score: {check.get('cvss_score', 'N/A')}")
print(f" Details: {check['details']}")
if not check["passed"]:
print(f" Recommendation: {check['recommendation']}")
# Calculate pass rate
passed = sum(1 for c in results if c["passed"])
total = len(results)
print(f"\nDefensive Posture: {passed}/{total} checks passed")
asyncio.run(validate_defenses())
Defensive Checks:
| Method | Test Category | Description |
|--------|---------------|-------------|
| test_dns_exfiltration_prevention() | DNS Egress | Validates egress filtering blocks DNS-based exfiltration |
| test_bot_detection() | Anti-Bot | Tests if headless browser detection works |
| test_cswsh() | Origin Validation | Tests 216+ malicious origins for CSRF protection |
| test_wss_security() | TLS/SSL | Validates protocol version, ciphers, certificates |
Sources: examples/defensive_check.py:1-43
Payload Mutation API
The PayloadMutator class provides standalone payload generation and WAF evasion:
from wshawk.payload_mutator import PayloadMutator, MutationStrategy
def generate_payloads():
mutator = PayloadMutator()
# Generate variants using specific strategy
base_payload = "<script>alert(1)</script>"
variants = mutator.mutate_payload(
base_payload,
MutationStrategy.CASE_VARIATION,
count=5
)
for variant in variants:
print(variant)
# Adaptive payload generation (auto-selects strategies)
adaptive = mutator.generate_adaptive_payloads(
base_payload,
max_count=20
)
# Learn from WAF responses
mutator.learn_from_response(
payload="<script>alert(1)</script>",
response="403 Forbidden - Cloudflare",
is_blocked=True,
response_time=0.02
)
# Get recommended strategy after learning
recommended = mutator.get_recommended_strategy()
print(f"Recommended: {recommended.value}")
Available Mutation Strategies:
| Strategy | Enum Value | Use Case |
|----------|-----------|----------|
| Case Variation | CASE_VARIATION | Bypass case-sensitive filters |
| URL Encoding | URL_ENCODING | Evade signature-based WAFs |
| Unicode | UNICODE_ENCODING | Bypass charset filters |
| HTML Encoding | HTML_ENTITY | Evade HTML parsers |
| Comment Injection | COMMENT_INJECTION | Break signature patterns |
| Whitespace | WHITESPACE_VARIATION | Bypass regex anchors |
| Concatenation | CONCATENATION | Evade string matching |
| Null Bytes | NULL_BYTE | Exploit C-style parsing |
Sources: examples/mutation_demo.py:1-67, wshawk/payload_mutator.py
Integration Patterns
CI/CD Integration
import asyncio
import sys
from wshawk.scanner_v2 import WSHawkV2
async def ci_scan(target, max_severity="MEDIUM"):
scanner = WSHawkV2(target, max_rps=5)
scanner.use_headless_browser = False # Faster for CI
vulns = await scanner.run_heuristic_scan()
# Filter by severity
severity_order = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}
threshold = severity_order.get(max_severity, 2)
failures = [
v for v in vulns
if severity_order.get(v['confidence'], 0) >= threshold
]
if failures:
print(f"FAIL: {len(failures)} vulnerabilities >= {max_severity}")
for v in failures:
print(f" [{v['confidence']}] {v['type']}")
sys.exit(1)
else:
print(f"PASS: No vulnerabilities >= {max_severity}")
sys.exit(0)
asyncio.run(ci_scan("ws://staging.example.com", "HIGH"))
Sources: examples/basic_scan.py:1-41
Custom Payload Integration
async def custom_payloads_scan():
scanner = WSHawkV2("ws://target.com")
# Connect and learn
ws = await scanner.connect()
await scanner.learning_phase(ws, duration=5)
# Custom SQL injection payloads
custom_sqli = [
"1' AND SLEEP(5)--",
"' OR '1'='1' UNION SELECT password FROM users--",
"admin'--"
]
# Inject custom payloads
for payload in custom_sqli:
# Use message analyzer to inject into correct field
injected = scanner.message_analyzer.inject_payload_into_message(
scanner.sample_messages[0],
payload
)
for msg in injected:
await ws.send(msg)
response = await asyncio.wait_for(ws.recv(), timeout=2.0)
# Use verifier for automated analysis
is_vuln, confidence, desc = scanner.verifier.verify_sql_injection(
response, payload
)
if is_vuln:
print(f"[{confidence.value}] {desc}")
print(f" Payload: {payload}")
await ws.close()
Sources: wshawk/scanner_v2.py:143-210
Multi-Target Scanning
async def scan_multiple_targets(targets):
results = {}
for target in targets:
scanner = WSHawkV2(target, max_rps=3)
scanner.use_headless_browser = False
try:
vulns = await scanner.run_heuristic_scan()
results[target] = {
"success": True,
"vulnerabilities": len(vulns),
"critical": sum(1 for v in vulns if v['confidence'] == 'CRITICAL'),
"high": sum(1 for v in vulns if v['confidence'] == 'HIGH')
}
except Exception as e:
results[target] = {
"success": False,
"error": str(e)
}
return results
# Run parallel scans
import asyncio
targets = [
"ws://api1.example.com",
"ws://api2.example.com",
"ws://api3.example.com"
]
results = asyncio.run(scan_multiple_targets(targets))
Sources: wshawk/scanner_v2.py:28-77
Error Handling
All async methods may raise exceptions that should be handled:
import asyncio
import websockets
async def robust_scan():
scanner = WSHawkV2("ws://target.com")
try:
results = await scanner.run_heuristic_scan()
return results
except websockets.exceptions.InvalidURI as e:
print(f"Invalid WebSocket URL: {e}")
except websockets.exceptions.InvalidHandshake as e:
print(f"Handshake failed (check auth/headers): {e}")
except ConnectionRefusedError:
print("Connection refused - server not reachable")
except asyncio.TimeoutError:
print("Scan timeout - server not responding")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
# Cleanup verification resources
if scanner.headless_verifier:
await scanner.headless_verifier.stop()
if scanner.oast_provider:
await scanner.oast_provider.stop()
Common Exceptions:
| Exception | Cause | Solution |
|-----------|-------|----------|
| websockets.exceptions.InvalidURI | Malformed WebSocket URL | Verify URL format (ws:// or wss://) |
| websockets.exceptions.InvalidHandshake | Authentication failure | Check headers, credentials, CORS |
| ConnectionRefusedError | Target unreachable | Verify network connectivity, firewall rules |
| asyncio.TimeoutError | Slow server responses | Increase timeout in individual test methods |
| playwright._impl._api_types.Error | Browser automation failure | Ensure playwright install chromium ran |
Sources: wshawk/scanner_v2.py:77-85, wshawk/scanner_v2.py:615-628