Creating Protocol Plugins

Creating Protocol Plugins

The following files were used as context for generating this wiki page:

Purpose and Scope

This document provides a technical guide for creating custom ProtocolPlugin implementations that handle non-standard WebSocket message protocols such as Protobuf, MessagePack, CBOR, or proprietary binary formats. Protocol plugins transform incoming messages for analysis and outgoing payloads for injection.

For information about creating custom payload collections, see Creating Payload Plugins. For custom vulnerability detection logic, see Creating Detector Plugins. For plugin validation and versioning requirements, see Plugin Validation and Versioning.

Sources: wshawk/plugin_system.py:86-96, plugins/README.md:1-37


ProtocolPlugin Base Class

The ProtocolPlugin abstract base class defines the interface for custom protocol handlers. All protocol plugins must inherit from ProtocolPlugin and implement its abstract methods.

Class Hierarchy

graph TB
    ABC["ABC<br/>(Python Abstract Base)"]
    PluginBase["PluginBase<br/>wshawk/plugin_system.py:35-50"]
    ProtocolPlugin["ProtocolPlugin<br/>wshawk/plugin_system.py:86-96"]
    CustomProto["CustomProtocolPlugin<br/>(Your Implementation)"]
    
    ABC --> PluginBase
    PluginBase --> ProtocolPlugin
    ProtocolPlugin --> CustomProto
    
    PluginBase -->|"requires"| GetMetadata["get_metadata() -> PluginMetadata"]
    ProtocolPlugin -->|"requires"| HandleMessage["handle_message(message, context) -> str"]
    ProtocolPlugin -->|"optional"| GetProtocolName["get_protocol_name() -> str"]

Sources: wshawk/plugin_system.py:35-50, wshawk/plugin_system.py:86-96

Base Class Structure

| Method | Type | Purpose | |--------|------|---------| | get_metadata() | Abstract (from PluginBase) | Returns PluginMetadata with name, version, description | | handle_message(message, context) | Abstract | Processes incoming/outgoing messages | | get_protocol_name() | Concrete (overridable) | Returns protocol identifier (default: "custom") | | get_name() | Inherited | Returns plugin name from metadata | | get_version() | Inherited | Returns plugin version from metadata |

Sources: wshawk/plugin_system.py:35-96


Required Method Implementations

1. get_metadata()

Returns a PluginMetadata dataclass instance containing plugin identification and compatibility information.

Required Fields:

  • name (str): Unique plugin identifier (lowercase, underscores)
  • version (str): Semantic version (e.g., "1.0.0")
  • description (str): Human-readable description
  • author (str): Plugin author (default: "Regaan")
  • requires (List[str]): Python package dependencies (default: None)
  • min_wshawk_version (str): Minimum compatible WSHawk version (default: "2.0.0")

Example Implementation:

def get_metadata(self) -> PluginMetadata:
    return PluginMetadata(
        name="protobuf_handler",
        version="1.0.0",
        description="Protocol Buffers message handler",
        author="Security Team",
        requires=["protobuf>=4.0.0"],
        min_wshawk_version="3.0.0"
    )

Validation: The PluginManager._validate_plugin() method validates metadata fields, version format (semver), and compatibility with current WSHawk version.

Sources: wshawk/plugin_system.py:17-33, wshawk/plugin_system.py:203-235

2. handle_message()

Asynchronous method that processes WebSocket messages. This method is called for both incoming responses (for analysis) and outgoing payloads (for transformation).

Signature:

async def handle_message(self, message: str, context: Dict = None) -> str

Parameters:

  • message (str): Raw message content (JSON string, binary data as base64, etc.)
  • context (Dict, optional): Additional information
    • direction: "inbound" or "outbound"
    • message_type: "text" or "binary"
    • timestamp: Message timestamp
    • metadata: Custom key-value pairs

Returns: Processed message string

Example Implementation:

async def handle_message(self, message: str, context: Dict = None) -> str:
    context = context or {}
    direction = context.get('direction', 'unknown')
    
    if direction == 'outbound':
        # Transform payload before sending
        return self._encode_protobuf(message)
    else:
        # Decode response for analysis
        return self._decode_protobuf(message)

Sources: wshawk/plugin_system.py:89-92

3. get_protocol_name() (Optional)

Returns a string identifier for the protocol. Used for logging and plugin selection.

Default Implementation: Returns "custom"

Override Example:

def get_protocol_name(self) -> str:
    return "protobuf"

Sources: wshawk/plugin_system.py:94-96


Message Handling Flow

This diagram illustrates how protocol plugins integrate with the scanning engine and message processing pipeline.

graph TB
    subgraph "Scanner (scanner_v2.py)"
        Scanner["WSHawkV2.run_heuristic_scan()"]
        SendPayload["send_payload()"]
        ReceiveMsg["receive_message()"]
    end
    
    subgraph "PluginManager (plugin_system.py:98-439)"
        PluginMgr["PluginManager"]
        LazyLoad["_load_plugin_lazy()<br/>Lines 158-201"]
        Registry["_protocol_plugins: Dict<br/>Line 116"]
    end
    
    subgraph "ProtocolPlugin (Your Implementation)"
        HandleMsg["handle_message(message, context)"]
        Encode["_encode_protocol()"]
        Decode["_decode_protocol()"]
    end
    
    subgraph "Message Flow"
        RawPayload["Raw Payload<br/>(String/Dict)"]
        EncodedPayload["Encoded Payload<br/>(Protocol Format)"]
        WSSend["WebSocket.send()"]
        WSRecv["WebSocket.recv()"]
        EncodedResp["Encoded Response<br/>(Protocol Format)"]
        DecodedResp["Decoded Response<br/>(String/Dict)"]
        Analyzer["MessageAnalyzer<br/>VulnerabilityVerifier"]
    end
    
    Scanner --> |"needs protocol plugin"| PluginMgr
    PluginMgr --> LazyLoad
    LazyLoad --> Registry
    Registry --> HandleMsg
    
    SendPayload --> RawPayload
    RawPayload --> |"context: direction=outbound"| HandleMsg
    HandleMsg --> Encode
    Encode --> EncodedPayload
    EncodedPayload --> WSSend
    
    WSRecv --> EncodedResp
    EncodedResp --> |"context: direction=inbound"| HandleMsg
    HandleMsg --> Decode
    Decode --> DecodedResp
    DecodedResp --> Analyzer
    Analyzer --> ReceiveMsg

Sources: wshawk/plugin_system.py:98-439, wshawk/plugin_system.py:158-201, wshawk/plugin_system.py:301-311


Example Protocol Plugin Implementations

Example 1: Protobuf Handler

from wshawk.plugin_system import ProtocolPlugin, PluginMetadata
from google.protobuf import message as proto_message
import base64
from typing import Dict

class ProtobufHandler(ProtocolPlugin):
    """Handles Protocol Buffers serialized messages"""
    
    def get_metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="protobuf_handler",
            version="1.0.0",
            description="Protocol Buffers binary message handler",
            author="WSHawk Team",
            requires=["protobuf>=4.0.0"],
            min_wshawk_version="3.0.0"
        )
    
    async def handle_message(self, message: str, context: Dict = None) -> str:
        context = context or {}
        direction = context.get('direction', 'inbound')
        
        try:
            if direction == 'outbound':
                # Encode JSON to protobuf (simplified)
                return self._json_to_protobuf(message)
            else:
                # Decode protobuf to JSON for analysis
                return self._protobuf_to_json(message)
        except Exception as e:
            print(f"[ERROR] Protobuf handling failed: {e}")
            return message  # Return original on failure
    
    def get_protocol_name(self) -> str:
        return "protobuf"
    
    def _json_to_protobuf(self, json_str: str) -> str:
        # Implementation: Convert JSON to protobuf bytes, base64 encode
        pass
    
    def _protobuf_to_json(self, protobuf_bytes: str) -> str:
        # Implementation: Decode base64, parse protobuf, convert to JSON
        pass

Sources: wshawk/plugin_system.py:86-96, wshawk/plugin_system.py:17-33

Example 2: MessagePack Binary Handler

from wshawk.plugin_system import ProtocolPlugin, PluginMetadata
import msgpack
import json
from typing import Dict

class MessagePackHandler(ProtocolPlugin):
    """Handles MessagePack binary serialization"""
    
    def get_metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="msgpack_handler",
            version="1.0.0",
            description="MessagePack binary format handler",
            author="WSHawk Team",
            requires=["msgpack>=1.0.0"],
            min_wshawk_version="3.0.0"
        )
    
    async def handle_message(self, message: str, context: Dict = None) -> str:
        context = context or {}
        msg_type = context.get('message_type', 'text')
        
        if msg_type == 'binary':
            try:
                # Unpack binary MessagePack to dict
                unpacked = msgpack.unpackb(message.encode('latin1'))
                # Return as JSON string for analysis
                return json.dumps(unpacked)
            except Exception as e:
                print(f"[ERROR] MessagePack decode failed: {e}")
                return message
        else:
            # For text messages, try to pack as MessagePack
            try:
                data = json.loads(message)
                packed = msgpack.packb(data)
                return packed.decode('latin1')
            except Exception:
                return message
    
    def get_protocol_name(self) -> str:
        return "msgpack"

Sources: wshawk/plugin_system.py:86-96

Example 3: Custom Binary Protocol

from wshawk.plugin_system import ProtocolPlugin, PluginMetadata
import struct
import json
from typing import Dict

class CustomBinaryProtocol(ProtocolPlugin):
    """
    Handles custom binary protocol with structure:
    [1 byte: message_type][4 bytes: length][N bytes: payload]
    """
    
    MSG_TYPE_DATA = 0x01
    MSG_TYPE_COMMAND = 0x02
    
    def get_metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="custom_binary",
            version="1.0.0",
            description="Custom binary protocol handler",
            author="Security Team",
            min_wshawk_version="3.0.0"
        )
    
    async def handle_message(self, message: str, context: Dict = None) -> str:
        context = context or {}
        direction = context.get('direction', 'inbound')
        
        if direction == 'outbound':
            return self._encode_binary(message)
        else:
            return self._decode_binary(message)
    
    def get_protocol_name(self) -> str:
        return "custom_binary"
    
    def _encode_binary(self, json_message: str) -> str:
        """Encode JSON payload to custom binary format"""
        try:
            data = json.loads(json_message)
            payload = json.dumps(data).encode('utf-8')
            
            # Pack: [msg_type][length][payload]
            header = struct.pack('!BI', self.MSG_TYPE_DATA, len(payload))
            binary_msg = header + payload
            
            # Return as latin1 string for WebSocket
            return binary_msg.decode('latin1')
        except Exception as e:
            print(f"[ERROR] Binary encode failed: {e}")
            return json_message
    
    def _decode_binary(self, binary_message: str) -> str:
        """Decode custom binary format to JSON"""
        try:
            # Convert back to bytes
            binary_data = binary_message.encode('latin1')
            
            # Unpack header
            msg_type, length = struct.unpack('!BI', binary_data[:5])
            payload = binary_data[5:5+length]
            
            # Parse payload as JSON
            json_data = json.loads(payload.decode('utf-8'))
            
            # Add metadata about message type
            json_data['_binary_msg_type'] = msg_type
            
            return json.dumps(json_data)
        except Exception as e:
            print(f"[ERROR] Binary decode failed: {e}")
            return binary_message

Sources: wshawk/plugin_system.py:86-96


Plugin Registration and Discovery

Automatic Discovery

The PluginManager automatically scans the plugins/ directory for Python files and registers ProtocolPlugin implementations via lazy loading.

Discovery Process:

graph TB
    Init["PluginManager.__init__()<br/>Line 109"]
    EnsureDir["_ensure_directories()<br/>Lines 136-139"]
    ScanAvail["_scan_available_plugins()<br/>Lines 141-156"]
    
    LoadLazy["_load_plugin_lazy(name)<br/>Lines 158-201"]
    ImportMod["importlib.import_module()"]
    FindClass["Iterate dir(module)"]
    CheckType["isinstance(attr, type) and<br/>issubclass(attr, ProtocolPlugin)"]
    Validate["_validate_plugin(plugin)<br/>Lines 203-235"]
    Register["_register_plugin_internal(plugin)<br/>Lines 263-313"]
    
    Init --> EnsureDir
    EnsureDir --> ScanAvail
    ScanAvail --> |"stores available plugins"| AvailDict["_available_plugins: Dict"]
    
    AvailDict --> |"when needed"| LoadLazy
    LoadLazy --> ImportMod
    ImportMod --> FindClass
    FindClass --> CheckType
    CheckType --> |"is ProtocolPlugin"| Validate
    Validate --> |"passes validation"| Register
    Register --> |"adds to"| RegDict["_protocol_plugins: Dict<br/>Line 116"]

Discovery Rules:

  • Files in plugins/ directory ending with .py
  • Not starting with _ (underscore)
  • Contains classes inheriting from ProtocolPlugin
  • Passes metadata validation

Sources: wshawk/plugin_system.py:109-156, wshawk/plugin_system.py:158-201

Manual Registration

Protocol plugins can also be registered programmatically:

from wshawk.plugin_system import PluginManager

# Create manager
manager = PluginManager(plugin_dir="plugins")

# Create plugin instance
protocol_plugin = ProtobufHandler()

# Register manually
success = manager.register_plugin(protocol_plugin)
if success:
    print(f"Registered: {protocol_plugin.get_name()}")

Thread Safety: Registration operations are protected by a threading.Lock to ensure thread-safe concurrent access.

Sources: wshawk/plugin_system.py:320-333, wshawk/plugin_system.py:127

Duplicate Detection

The PluginManager prevents duplicate protocol plugin registration by default:

| Policy | Behavior | |--------|----------| | allow_override = False (default) | Duplicate plugin names rejected with error message | | allow_override = True | Later plugin replaces earlier plugin with warning |

Implementation: See wshawk/plugin_system.py:302-311 for duplicate detection logic.

Sources: wshawk/plugin_system.py:130, wshawk/plugin_system.py:302-311


Integration with Scanner

Accessing Protocol Plugins in Scanner

While the current codebase does not show explicit scanner integration for protocol plugins, the intended integration pattern follows this structure:

# In scanner_v2.py or custom scanning code
from wshawk.plugin_system import PluginManager

class WSHawkV2:
    def __init__(self, url):
        self.url = url
        self.plugin_manager = PluginManager()
        self.protocol_plugin = None
    
    def set_protocol(self, protocol_name: str):
        """Select protocol plugin by name"""
        # Load plugin if not already loaded
        self.plugin_manager._load_plugin_lazy(protocol_name)
        
        # Get plugin instance
        if protocol_name in self.plugin_manager._protocol_plugins:
            self.protocol_plugin = self.plugin_manager._protocol_plugins[protocol_name]
            print(f"[OK] Using protocol: {protocol_name}")
        else:
            print(f"[ERROR] Protocol plugin '{protocol_name}' not found")
    
    async def send_payload(self, payload: str):
        """Send payload through protocol plugin"""
        if self.protocol_plugin:
            context = {'direction': 'outbound', 'timestamp': time.time()}
            encoded = await self.protocol_plugin.handle_message(payload, context)
            await self.ws.send(encoded)
        else:
            await self.ws.send(payload)
    
    async def receive_response(self):
        """Receive and decode response"""
        raw_response = await self.ws.recv()
        
        if self.protocol_plugin:
            context = {'direction': 'inbound', 'timestamp': time.time()}
            decoded = await self.protocol_plugin.handle_message(raw_response, context)
            return decoded
        else:
            return raw_response

Sources: wshawk/plugin_system.py:301-313

Configuration Example

# wshawk.yaml
plugins:
  enabled: true
  directory: "plugins"
  protocol: "protobuf_handler"  # Auto-load this protocol

scanning:
  use_protocol_plugin: true

Sources: plugins/README.md:30-32


Testing Protocol Plugins

Unit Testing Template

import pytest
import asyncio
from your_plugin import CustomBinaryProtocol

@pytest.mark.asyncio
async def test_protocol_encode_decode():
    plugin = CustomBinaryProtocol()
    
    # Test outbound encoding
    json_payload = '{"action": "test", "data": "payload"}'
    context_out = {'direction': 'outbound'}
    
    encoded = await plugin.handle_message(json_payload, context_out)
    assert isinstance(encoded, str)
    assert len(encoded) > len(json_payload)  # Binary should have header
    
    # Test inbound decoding
    context_in = {'direction': 'inbound'}
    decoded = await plugin.handle_message(encoded, context_in)
    
    # Should decode back to original structure
    import json
    decoded_data = json.loads(decoded)
    assert decoded_data['action'] == 'test'
    assert decoded_data['data'] == 'payload'

@pytest.mark.asyncio
async def test_protocol_metadata():
    plugin = CustomBinaryProtocol()
    metadata = plugin.get_metadata()
    
    assert metadata.name == "custom_binary"
    assert metadata.version == "1.0.0"
    assert plugin.get_protocol_name() == "custom_binary"

@pytest.mark.asyncio
async def test_protocol_error_handling():
    plugin = CustomBinaryProtocol()
    
    # Test with invalid input
    invalid_input = "not valid binary data"
    context = {'direction': 'inbound'}
    
    # Should not crash
    result = await plugin.handle_message(invalid_input, context)
    assert result is not None

Sources: wshawk/plugin_system.py:86-96

Manual Integration Testing

from wshawk.plugin_system import PluginManager
import asyncio

async def test_integration():
    manager = PluginManager()
    
    # Register plugin
    protocol = CustomBinaryProtocol()
    manager.register_plugin(protocol)
    
    # List registered protocols
    plugins = manager.list_plugins()
    print(f"Protocol plugins: {plugins['protocol_plugins']}")
    
    # Test message handling
    test_message = '{"test": "data"}'
    result = await protocol.handle_message(test_message, {'direction': 'outbound'})
    print(f"Encoded: {len(result)} bytes")
    
    decoded = await protocol.handle_message(result, {'direction': 'inbound'})
    print(f"Decoded: {decoded}")

if __name__ == "__main__":
    asyncio.run(test_integration())

Sources: wshawk/plugin_system.py:498-550


Best Practices

1. Error Handling

Always implement robust error handling in handle_message():

async def handle_message(self, message: str, context: Dict = None) -> str:
    try:
        # Protocol-specific processing
        return self._process(message)
    except ProtocolDecodeError as e:
        print(f"[ERROR] Protocol decode failed: {e}")
        return message  # Return original on failure
    except Exception as e:
        print(f"[ERROR] Unexpected error in protocol plugin: {e}")
        return message

Rationale: Scanner should continue operation even if protocol plugin fails.

2. Context Validation

Always validate and provide defaults for context dictionary:

async def handle_message(self, message: str, context: Dict = None) -> str:
    context = context or {}
    direction = context.get('direction', 'unknown')
    msg_type = context.get('message_type', 'text')
    
    # Use validated context values
    if direction == 'outbound':
        # ...

3. Performance Considerations

Use Caching: For expensive operations like schema compilation:

class ProtobufHandler(ProtocolPlugin):
    def __init__(self):
        self._schema_cache = {}
    
    def _get_schema(self, schema_name: str):
        if schema_name not in self._schema_cache:
            self._schema_cache[schema_name] = self._load_schema(schema_name)
        return self._schema_cache[schema_name]

Lazy Initialization: Don't load heavy dependencies in __init__:

def __init__(self):
    self._encoder = None  # Initialize on first use

async def handle_message(self, message: str, context: Dict = None) -> str:
    if self._encoder is None:
        self._encoder = self._initialize_encoder()
    # ...

4. Thread Safety

Protocol plugins may be called from multiple threads. Ensure thread-safe access to shared state:

import threading

class CustomProtocol(ProtocolPlugin):
    def __init__(self):
        self._cache = {}
        self._lock = threading.Lock()
    
    async def handle_message(self, message: str, context: Dict = None) -> str:
        with self._lock:
            # Thread-safe cache access
            if message in self._cache:
                return self._cache[message]

Note: The PluginManager itself is thread-safe (see wshawk/plugin_system.py:127).

5. Metadata Best Practices

| Field | Best Practice | |-------|--------------| | name | Lowercase, underscores, descriptive (e.g., protobuf_v3) | | version | Strict semver: MAJOR.MINOR.PATCH | | description | Concise, include protocol version if applicable | | requires | List all external dependencies with version constraints | | min_wshawk_version | Conservative estimate, test compatibility |

6. Binary Data Handling

When handling binary WebSocket messages, use latin1 encoding for string conversion:

# Binary to string
binary_bytes = b'\x01\x02\x03\x04'
string_repr = binary_bytes.decode('latin1')

# String to binary
binary_bytes = string_repr.encode('latin1')

Rationale: latin1 encoding preserves byte values 0-255 exactly, preventing data corruption.

7. Logging and Debugging

Implement informative logging for debugging:

async def handle_message(self, message: str, context: Dict = None) -> str:
    direction = context.get('direction', 'unknown') if context else 'unknown'
    
    print(f"[PROTOCOL] {self.get_protocol_name()}: {direction}")
    print(f"[PROTOCOL] Input length: {len(message)} bytes")
    
    result = self._process(message)
    
    print(f"[PROTOCOL] Output length: {len(result)} bytes")
    return result

8. Validation and Sanitization

Validate protocol-specific constraints:

async def handle_message(self, message: str, context: Dict = None) -> str:
    # Validate message structure
    if not self._is_valid_format(message):
        print(f"[WARNING] Invalid protocol format, skipping transformation")
        return message
    
    # Validate size constraints
    if len(message) > self.MAX_MESSAGE_SIZE:
        print(f"[ERROR] Message exceeds maximum size")
        return message[:self.MAX_MESSAGE_SIZE]
    
    return self._transform(message)

Sources: wshawk/plugin_system.py:86-96, wshawk/plugin_system.py:203-235, wshawk/plugin_system.py:127


Plugin File Structure

Directory Organization

wshawk/
├── plugins/
│   ├── __init__.py                  # Empty or plugin imports
│   ├── README.md                    # Plugin documentation
│   ├── protobuf_handler.py          # Protocol plugin
│   ├── msgpack_handler.py           # Protocol plugin
│   └── custom_binary.py             # Protocol plugin
├── wshawk/
│   └── plugin_system.py             # Core plugin infrastructure
└── wshawk.yaml                      # Configuration (optional)

Plugin File Template

"""
Custom Protocol Plugin for WSHawk
Handles: [protocol name and version]
"""

from wshawk.plugin_system import ProtocolPlugin, PluginMetadata
from typing import Dict

class MyProtocolHandler(ProtocolPlugin):
    """
    [Brief description of protocol]
    """
    
    # Protocol constants
    VERSION = "1.0"
    MAX_MESSAGE_SIZE = 1024 * 1024  # 1MB
    
    def get_metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="my_protocol",
            version="1.0.0",
            description="[Protocol description]",
            author="[Your name]",
            requires=["[dependency>=version]"],
            min_wshawk_version="3.0.0"
        )
    
    async def handle_message(self, message: str, context: Dict = None) -> str:
        """
        Transform messages between protocol format and analysis format
        
        Args:
            message: Raw message content
            context: Metadata (direction, message_type, etc.)
        
        Returns:
            Transformed message string
        """
        context = context or {}
        direction = context.get('direction', 'unknown')
        
        try:
            if direction == 'outbound':
                return self._encode(message)
            else:
                return self._decode(message)
        except Exception as e:
            print(f"[ERROR] {self.get_protocol_name()}: {e}")
            return message
    
    def get_protocol_name(self) -> str:
        return "my_protocol"
    
    def _encode(self, message: str) -> str:
        """Encode JSON/text to protocol format"""
        # Implementation
        pass
    
    def _decode(self, message: str) -> str:
        """Decode protocol format to JSON/text"""
        # Implementation
        pass

Sources: wshawk/plugin_system.py:86-96, plugins/README.md:12-28


Advanced Topics

Stateful Protocol Handlers

For protocols requiring connection state (sequence numbers, session keys):

class StatefulProtocolHandler(ProtocolPlugin):
    def __init__(self):
        self._sequence_number = 0
        self._session_key = None
    
    async def handle_message(self, message: str, context: Dict = None) -> str:
        # Increment sequence for each outbound message
        if context and context.get('direction') == 'outbound':
            self._sequence_number += 1
            header = f"SEQ:{self._sequence_number}|"
            return header + message
        return message
    
    def reset_state(self):
        """Call between connections"""
        self._sequence_number = 0
        self._session_key = None

Multi-Protocol Support

A single plugin can handle multiple protocol variants:

class MultiProtocolHandler(ProtocolPlugin):
    SUPPORTED_PROTOCOLS = ['proto_v1', 'proto_v2', 'proto_v3']
    
    async def handle_message(self, message: str, context: Dict = None) -> str:
        context = context or {}
        protocol_version = context.get('protocol_version', 'proto_v1')
        
        if protocol_version == 'proto_v1':
            return self._handle_v1(message, context)
        elif protocol_version == 'proto_v2':
            return self._handle_v2(message, context)
        elif protocol_version == 'proto_v3':
            return self._handle_v3(message, context)
        else:
            return message

Schema Evolution

For protocols with schema versioning:

class SchemaVersionedProtocol(ProtocolPlugin):
    def __init__(self):
        self._schemas = {
            1: self._load_schema_v1(),
            2: self._load_schema_v2(),
            3: self._load_schema_v3()
        }
    
    async def handle_message(self, message: str, context: Dict = None) -> str:
        schema_version = self._detect_schema_version(message)
        schema = self._schemas.get(schema_version, self._schemas[1])
        
        return self._transform_with_schema(message, schema, context)

Sources: wshawk/plugin_system.py:86-96


Summary

Protocol plugins extend WSHawk's capability to handle non-standard WebSocket message formats. Key implementation requirements:

  1. Inherit from ProtocolPlugin base class
  2. Implement get_metadata() returning PluginMetadata
  3. Implement async handle_message(message, context) for bidirectional transformation
  4. Optionally override get_protocol_name() for protocol identification
  5. Place plugin file in plugins/ directory for automatic discovery
  6. Handle errors gracefully, returning original message on failure
  7. Validate context parameters with defaults
  8. Consider thread safety for shared state
  9. Test with unit tests and integration tests

For plugin validation and versioning details, see Plugin Validation and Versioning. For complete plugin system architecture, see Plugin Architecture Overview.

Sources: wshawk/plugin_system.py:86-96, wshawk/plugin_system.py:17-33, wshawk/plugin_system.py:98-439, plugins/README.md:1-37