Python API

Python API

The following files were used as context for generating this wiki page:

This document covers programmatic usage of WSHawk through its Python API. It focuses on the WSHawkV2 class for offensive scanning, the DefensiveValidator class for blue team validation, and the PayloadMutator class for custom payload generation. For CLI-based usage, see Quick Start Examples. For detailed scanner architecture, see WSHawkV2 Scanner Engine. For configuration files and authentication sequences, see Configuration and Authentication.


API Entry Points

WSHawk provides three primary Python classes for programmatic integration:

graph TB
    subgraph "Public API Classes"
        WSHawkV2["WSHawkV2<br/>wshawk.scanner_v2"]
        DefensiveValidator["DefensiveValidator<br/>wshawk.defensive_validation"]
        PayloadMutator["PayloadMutator<br/>wshawk.payload_mutator"]
    end
    
    subgraph "Core Modules (Auto-initialized)"
        MessageAnalyzer["MessageAnalyzer<br/>message_intelligence.py"]
        VulnerabilityVerifier["VulnerabilityVerifier<br/>vulnerability_verifier.py"]
        ServerFingerprinter["ServerFingerprinter<br/>server_fingerprint.py"]
        SessionStateMachine["SessionStateMachine<br/>state_machine.py"]
        TokenBucketRateLimiter["TokenBucketRateLimiter<br/>rate_limiter.py"]
        EnhancedHTMLReporter["EnhancedHTMLReporter<br/>enhanced_reporter.py"]
    end
    
    subgraph "Optional Components"
        HeadlessBrowserXSSVerifier["HeadlessBrowserXSSVerifier<br/>headless_xss_verifier.py"]
        OASTProvider["OASTProvider<br/>oast_provider.py"]
        SessionHijackingTester["SessionHijackingTester<br/>session_hijacking_tester.py"]
    end
    
    WSHawkV2 --> MessageAnalyzer
    WSHawkV2 --> VulnerabilityVerifier
    WSHawkV2 --> ServerFingerprinter
    WSHawkV2 --> SessionStateMachine
    WSHawkV2 --> TokenBucketRateLimiter
    WSHawkV2 --> EnhancedHTMLReporter
    
    WSHawkV2 -.optional.-> HeadlessBrowserXSSVerifier
    WSHawkV2 -.optional.-> OASTProvider
    WSHawkV2 -.optional.-> SessionHijackingTester
    
    DefensiveValidator --> MessageAnalyzer
    PayloadMutator -.standalone.-> WSHawkV2

Sources: wshawk/scanner_v2.py:1-77, examples/basic_scan.py:1-41, examples/defensive_check.py:1-43, examples/mutation_demo.py:1-67


WSHawkV2 Class

The WSHawkV2 class is the primary entry point for offensive security scanning. It orchestrates all vulnerability detection modules and provides comprehensive WebSocket security testing.

Basic Initialization

from wshawk.scanner_v2 import WSHawkV2

# Minimal initialization
scanner = WSHawkV2("ws://target.com")

# With custom headers
scanner = WSHawkV2(
    "ws://target.com",
    headers={"Authorization": "Bearer token123"}
)

# With rate limiting
scanner = WSHawkV2(
    "ws://target.com",
    max_rps=5  # 5 requests per second
)

# With authentication sequence
scanner = WSHawkV2(
    "ws://target.com",
    auth_sequence="path/to/auth.yaml"
)

Constructor Parameters:

| Parameter | Type | Default | Description | |-----------|------|---------|-------------| | url | str | Required | WebSocket target URL (ws:// or wss://) | | headers | Dict[str, str] | None | Custom HTTP headers for WebSocket handshake | | auth_sequence | str | None | Path to YAML file with authentication message sequence | | max_rps | int | 10 | Maximum requests per second for rate limiting |

Sources: wshawk/scanner_v2.py:28-76, README.md:209-223, examples/basic_scan.py:13-22


Configuring Scanner Behavior

Feature Flags

The scanner supports optional features that can be enabled or disabled before running:

scanner = WSHawkV2("ws://target.com")

# Enable/disable Playwright browser verification
scanner.use_headless_browser = True  # Default: True

# Enable/disable OAST blind vulnerability detection
scanner.use_oast = True  # Default: True

Feature Flag Reference:

| Property | Type | Default | Purpose | |----------|------|---------|---------| | use_headless_browser | bool | True | Enables real browser XSS verification via Playwright | | use_oast | bool | True | Enables Out-of-Band testing for blind XXE/SSRF |

Sources: wshawk/scanner_v2.py:52-58, examples/basic_scan.py:19-21

Accessing Internal Components

All core modules are accessible as public attributes after initialization:

scanner = WSHawkV2("ws://target.com")

# Access message analyzer for custom logic
analyzer = scanner.message_analyzer

# Access verifier for custom verification
verifier = scanner.verifier

# Access rate limiter stats
rate_stats = scanner.rate_limiter.get_stats()

# Access fingerprint data
fingerprint_info = scanner.fingerprinter.get_info()

Available Attributes:

| Attribute | Type | Description | |-----------|------|-------------| | message_analyzer | MessageAnalyzer | Detects message formats (JSON/XML/Binary) and injectable fields | | verifier | VulnerabilityVerifier | Automated vulnerability verification with confidence levels | | fingerprinter | ServerFingerprinter | Server technology detection (language, framework, database) | | state_machine | SessionStateMachine | WebSocket connection state tracking | | rate_limiter | TokenBucketRateLimiter | Adaptive rate limiting controller | | reporter | EnhancedHTMLReporter | HTML report generator |

Sources: wshawk/scanner_v2.py:40-76


Running Scans

Complete Heuristic Scan

The run_heuristic_scan() method executes all 11 vulnerability tests in sequence:

import asyncio
from wshawk.scanner_v2 import WSHawkV2

async def scan():
    scanner = WSHawkV2("ws://target.com")
    
    # Runs: SQLi, XSS, Command Injection, Path Traversal,
    # XXE, NoSQL, SSRF, and Session Hijacking tests
    vulnerabilities = await scanner.run_heuristic_scan()
    
    return vulnerabilities

# Execute
results = asyncio.run(scan())

Scan Lifecycle:

stateDiagram-v2
    [*] --> Initialize
    Initialize --> Connect: scanner.run_heuristic_scan()
    Connect --> LearningPhase: await connect()
    LearningPhase --> MessageAnalysis: Observe 5s of traffic
    MessageAnalysis --> Fingerprinting: Detect format/fields
    Fingerprinting --> VulnerabilityTests: Identify tech stack
    
    VulnerabilityTests --> SQLInjection
    SQLInjection --> XSS
    XSS --> CommandInjection
    CommandInjection --> PathTraversal
    PathTraversal --> XXE
    XXE --> NoSQL
    NoSQL --> SSRF
    SSRF --> SessionTests
    
    SessionTests --> Cleanup
    Cleanup --> ReportGeneration
    ReportGeneration --> [*]: Return vulnerabilities[]

Sources: wshawk/scanner_v2.py:542-677, examples/basic_scan.py:25-26

Individual Test Methods

Each vulnerability type has a dedicated async method that can be called independently:

async def custom_scan():
    scanner = WSHawkV2("ws://target.com")
    
    # Connect manually
    ws = await scanner.connect()
    if not ws:
        return
    
    # Run learning phase
    await scanner.learning_phase(ws, duration=5)
    
    # Run specific tests
    sqli_results = await scanner.test_sql_injection_v2(ws)
    xss_results = await scanner.test_xss_v2(ws)
    
    # Close connection
    await ws.close()
    
    return scanner.vulnerabilities

Available Test Methods:

| Method | Line Reference | Vulnerabilities Detected | |--------|---------------|--------------------------| | test_sql_injection_v2(ws) | scanner_v2.py:143-210 | SQL Injection with database-specific payloads | | test_xss_v2(ws) | scanner_v2.py:212-290 | Cross-Site Scripting with context analysis | | test_command_injection_v2(ws) | scanner_v2.py:292-356 | OS Command Injection with timing attacks | | test_path_traversal_v2(ws) | scanner_v2.py:358-397 | Directory Traversal / LFI | | test_xxe_v2(ws) | scanner_v2.py:399-453 | XML External Entity with OAST | | test_nosql_injection_v2(ws) | scanner_v2.py:455-493 | NoSQL Injection (MongoDB, etc.) | | test_ssrf_v2(ws) | scanner_v2.py:495-540 | Server-Side Request Forgery |

Sources: wshawk/scanner_v2.py:143-540


Working with Results

Vulnerability Data Structure

All detected vulnerabilities are appended to scanner.vulnerabilities as dictionaries:

vulnerabilities = await scanner.run_heuristic_scan()

for vuln in vulnerabilities:
    print(f"Type:        {vuln['type']}")
    print(f"Severity:    {vuln['severity']}")
    print(f"Confidence:  {vuln['confidence']}")
    print(f"Description: {vuln['description']}")
    print(f"Payload:     {vuln['payload']}")
    print(f"Response:    {vuln['response_snippet']}")
    print(f"Fix:         {vuln['recommendation']}")
    print()

Vulnerability Dictionary Schema:

| Key | Type | Description | |-----|------|-------------| | type | str | Vulnerability category (e.g., "SQL Injection", "XSS") | | severity | str | Risk level: "CRITICAL", "HIGH", "MEDIUM", "LOW" | | confidence | str | Detection confidence: "CRITICAL", "HIGH", "MEDIUM", "LOW" | | description | str | Human-readable explanation of the finding | | payload | str | Attack payload that triggered the vulnerability | | response_snippet | str | First 200 characters of server response | | recommendation | str | Remediation guidance | | browser_verified | bool | (XSS only) Whether Playwright confirmed execution | | cvss_score | float | (Session tests only) CVSS v3.1 score |

Sources: wshawk/scanner_v2.py:190-199, wshawk/scanner_v2.py:270-280, examples/basic_scan.py:32-36

Statistics and Metrics

Access scan metrics through scanner attributes:

await scanner.run_heuristic_scan()

# Request/response counts
print(f"Sent: {scanner.messages_sent}")
print(f"Received: {scanner.messages_received}")

# Timing
duration = (scanner.end_time - scanner.start_time).total_seconds()
print(f"Duration: {duration:.2f}s")

# Rate limiter stats
stats = scanner.rate_limiter.get_stats()
print(f"Total requests: {stats['total_requests']}")
print(f"Wait events: {stats['total_waits']}")
print(f"Current rate: {stats['current_rate']} req/s")
print(f"Adaptive adjustments: {stats['adaptive_adjustments']}")

Sources: wshawk/scanner_v2.py:65-75, wshawk/scanner_v2.py:631-675


Advanced Features

Playwright XSS Verification

Enable browser-based XSS verification to eliminate false positives:

scanner = WSHawkV2("ws://target.com")
scanner.use_headless_browser = True  # Default is True

# The scanner automatically initializes Playwright when needed
vulnerabilities = await scanner.run_heuristic_scan()

# Check which XSS findings were browser-verified
for vuln in vulnerabilities:
    if vuln['type'] == 'Cross-Site Scripting (XSS)':
        if vuln.get('browser_verified'):
            print(f"REAL XSS: {vuln['payload']}")

How Browser Verification Works:

sequenceDiagram
    participant Scanner as scanner.test_xss_v2()
    participant Verifier as VulnerabilityVerifier
    participant Browser as HeadlessBrowserXSSVerifier
    participant Playwright as Playwright API
    
    Scanner->>Verifier: verify_xss(response, payload)
    Verifier->>Scanner: confidence=HIGH (pattern matched)
    
    Scanner->>Browser: verify_xss_execution(response, payload)
    Browser->>Playwright: page.goto(html_with_payload)
    Playwright->>Playwright: Wait for console.log/alert
    Playwright->>Browser: Event captured
    Browser->>Scanner: is_executed=True, evidence=...
    
    Scanner->>Scanner: Set confidence=CRITICAL<br/>browser_verified=True

Sources: wshawk/scanner_v2.py:248-269, wshawk/scanner_v2.py:52-54

OAST Blind Vulnerability Detection

Enable Out-of-Band Application Security Testing for blind XXE and SSRF:

scanner = WSHawkV2("ws://target.com")
scanner.use_oast = True  # Default is True

# OAST provider automatically registers with interact.sh
vulnerabilities = await scanner.run_heuristic_scan()

# The scanner polls for callbacks after sending payloads
# Blind vulnerabilities are detected via DNS/HTTP interactions

OAST Integration Flow:

sequenceDiagram
    participant Scanner as scanner.test_xxe_v2()
    participant OAST as OASTProvider
    participant InteractSH as interact.sh
    participant Target as WebSocket Server
    
    Scanner->>OAST: start()
    OAST->>InteractSH: Register unique domain
    InteractSH->>OAST: xxxx.oast.fun
    
    Scanner->>OAST: generate_payload('xxe', 'test1')
    OAST->>Scanner: <!ENTITY xxe SYSTEM "http://test1.xxxx.oast.fun">
    
    Scanner->>Target: Send XXE payload
    Target->>InteractSH: DNS query for test1.xxxx.oast.fun
    
    Scanner->>OAST: poll_interactions()
    OAST->>InteractSH: Check for callbacks
    InteractSH->>OAST: [DNS query from X.X.X.X]
    OAST->>Scanner: Blind XXE confirmed!

Sources: wshawk/scanner_v2.py:56-58, wshawk/scanner_v2.py:406-451

Session Hijacking Tests

Session security tests run automatically at the end of run_heuristic_scan():

# Manual session testing
from wshawk.session_hijacking_tester import SessionHijackingTester

async def test_sessions():
    tester = SessionHijackingTester("ws://target.com")
    
    # Run all 6 session security tests
    results = await tester.run_all_tests()
    
    for result in results:
        if result.is_vulnerable:
            print(f"[{result.confidence}] {result.vuln_type.value}")
            print(f"  {result.description}")
            print(f"  CVSS: {result.cvss_score}")
            print(f"  Fix: {result.recommendation}")

Sources: wshawk/scanner_v2.py:589-613


Defensive Validation API

The DefensiveValidator class provides blue team security control validation:

from wshawk.defensive_validation import DefensiveValidator

async def validate_defenses():
    validator = DefensiveValidator("ws://your-server.com")
    
    # Run all 4 defensive checks
    results = await validator.run_all_checks()
    
    # Process results
    for check in results:
        status = "PASS" if check["passed"] else "FAIL"
        print(f"[{status}] {check['name']}")
        print(f"  Score: {check.get('cvss_score', 'N/A')}")
        print(f"  Details: {check['details']}")
        if not check["passed"]:
            print(f"  Recommendation: {check['recommendation']}")
    
    # Calculate pass rate
    passed = sum(1 for c in results if c["passed"])
    total = len(results)
    print(f"\nDefensive Posture: {passed}/{total} checks passed")

asyncio.run(validate_defenses())

Defensive Checks:

| Method | Test Category | Description | |--------|---------------|-------------| | test_dns_exfiltration_prevention() | DNS Egress | Validates egress filtering blocks DNS-based exfiltration | | test_bot_detection() | Anti-Bot | Tests if headless browser detection works | | test_cswsh() | Origin Validation | Tests 216+ malicious origins for CSRF protection | | test_wss_security() | TLS/SSL | Validates protocol version, ciphers, certificates |

Sources: examples/defensive_check.py:1-43


Payload Mutation API

The PayloadMutator class provides standalone payload generation and WAF evasion:

from wshawk.payload_mutator import PayloadMutator, MutationStrategy

def generate_payloads():
    mutator = PayloadMutator()
    
    # Generate variants using specific strategy
    base_payload = "<script>alert(1)</script>"
    variants = mutator.mutate_payload(
        base_payload,
        MutationStrategy.CASE_VARIATION,
        count=5
    )
    
    for variant in variants:
        print(variant)
    
    # Adaptive payload generation (auto-selects strategies)
    adaptive = mutator.generate_adaptive_payloads(
        base_payload,
        max_count=20
    )
    
    # Learn from WAF responses
    mutator.learn_from_response(
        payload="<script>alert(1)</script>",
        response="403 Forbidden - Cloudflare",
        is_blocked=True,
        response_time=0.02
    )
    
    # Get recommended strategy after learning
    recommended = mutator.get_recommended_strategy()
    print(f"Recommended: {recommended.value}")

Available Mutation Strategies:

| Strategy | Enum Value | Use Case | |----------|-----------|----------| | Case Variation | CASE_VARIATION | Bypass case-sensitive filters | | URL Encoding | URL_ENCODING | Evade signature-based WAFs | | Unicode | UNICODE_ENCODING | Bypass charset filters | | HTML Encoding | HTML_ENTITY | Evade HTML parsers | | Comment Injection | COMMENT_INJECTION | Break signature patterns | | Whitespace | WHITESPACE_VARIATION | Bypass regex anchors | | Concatenation | CONCATENATION | Evade string matching | | Null Bytes | NULL_BYTE | Exploit C-style parsing |

Sources: examples/mutation_demo.py:1-67, wshawk/payload_mutator.py


Integration Patterns

CI/CD Integration

import asyncio
import sys
from wshawk.scanner_v2 import WSHawkV2

async def ci_scan(target, max_severity="MEDIUM"):
    scanner = WSHawkV2(target, max_rps=5)
    scanner.use_headless_browser = False  # Faster for CI
    
    vulns = await scanner.run_heuristic_scan()
    
    # Filter by severity
    severity_order = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}
    threshold = severity_order.get(max_severity, 2)
    
    failures = [
        v for v in vulns 
        if severity_order.get(v['confidence'], 0) >= threshold
    ]
    
    if failures:
        print(f"FAIL: {len(failures)} vulnerabilities >= {max_severity}")
        for v in failures:
            print(f"  [{v['confidence']}] {v['type']}")
        sys.exit(1)
    else:
        print(f"PASS: No vulnerabilities >= {max_severity}")
        sys.exit(0)

asyncio.run(ci_scan("ws://staging.example.com", "HIGH"))

Sources: examples/basic_scan.py:1-41

Custom Payload Integration

async def custom_payloads_scan():
    scanner = WSHawkV2("ws://target.com")
    
    # Connect and learn
    ws = await scanner.connect()
    await scanner.learning_phase(ws, duration=5)
    
    # Custom SQL injection payloads
    custom_sqli = [
        "1' AND SLEEP(5)--",
        "' OR '1'='1' UNION SELECT password FROM users--",
        "admin'--"
    ]
    
    # Inject custom payloads
    for payload in custom_sqli:
        # Use message analyzer to inject into correct field
        injected = scanner.message_analyzer.inject_payload_into_message(
            scanner.sample_messages[0],
            payload
        )
        
        for msg in injected:
            await ws.send(msg)
            response = await asyncio.wait_for(ws.recv(), timeout=2.0)
            
            # Use verifier for automated analysis
            is_vuln, confidence, desc = scanner.verifier.verify_sql_injection(
                response, payload
            )
            
            if is_vuln:
                print(f"[{confidence.value}] {desc}")
                print(f"  Payload: {payload}")
    
    await ws.close()

Sources: wshawk/scanner_v2.py:143-210

Multi-Target Scanning

async def scan_multiple_targets(targets):
    results = {}
    
    for target in targets:
        scanner = WSHawkV2(target, max_rps=3)
        scanner.use_headless_browser = False
        
        try:
            vulns = await scanner.run_heuristic_scan()
            results[target] = {
                "success": True,
                "vulnerabilities": len(vulns),
                "critical": sum(1 for v in vulns if v['confidence'] == 'CRITICAL'),
                "high": sum(1 for v in vulns if v['confidence'] == 'HIGH')
            }
        except Exception as e:
            results[target] = {
                "success": False,
                "error": str(e)
            }
    
    return results

# Run parallel scans
import asyncio
targets = [
    "ws://api1.example.com",
    "ws://api2.example.com",
    "ws://api3.example.com"
]
results = asyncio.run(scan_multiple_targets(targets))

Sources: wshawk/scanner_v2.py:28-77


Error Handling

All async methods may raise exceptions that should be handled:

import asyncio
import websockets

async def robust_scan():
    scanner = WSHawkV2("ws://target.com")
    
    try:
        results = await scanner.run_heuristic_scan()
        return results
        
    except websockets.exceptions.InvalidURI as e:
        print(f"Invalid WebSocket URL: {e}")
        
    except websockets.exceptions.InvalidHandshake as e:
        print(f"Handshake failed (check auth/headers): {e}")
        
    except ConnectionRefusedError:
        print("Connection refused - server not reachable")
        
    except asyncio.TimeoutError:
        print("Scan timeout - server not responding")
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        
    finally:
        # Cleanup verification resources
        if scanner.headless_verifier:
            await scanner.headless_verifier.stop()
        if scanner.oast_provider:
            await scanner.oast_provider.stop()

Common Exceptions:

| Exception | Cause | Solution | |-----------|-------|----------| | websockets.exceptions.InvalidURI | Malformed WebSocket URL | Verify URL format (ws:// or wss://) | | websockets.exceptions.InvalidHandshake | Authentication failure | Check headers, credentials, CORS | | ConnectionRefusedError | Target unreachable | Verify network connectivity, firewall rules | | asyncio.TimeoutError | Slow server responses | Increase timeout in individual test methods | | playwright._impl._api_types.Error | Browser automation failure | Ensure playwright install chromium ran |

Sources: wshawk/scanner_v2.py:77-85, wshawk/scanner_v2.py:615-628