Skip to content

WebSocket with message_type

WebSocket client with Pydantic message validation via the message_type parameter.

Bases: WebSocket[T], Generic[T]

WebSocket with typed messages via Pydantic.

Provides automatic serialization/deserialization of messages using Pydantic models. Messages are validated on receive and serialized on send.

Example

from pydantic import BaseModel

class TradeMessage(BaseModel): ... symbol: str ... price: float ... quantity: float

async with TypedWebSocket( ... "wss://stream.example.com/ws", TradeMessage ... ) as ws: ... async for trade in ws: # trade is TradeMessage ... print(f"{trade.symbol}: {trade.price}")

Parameters:

Name Type Description Default
uri str

WebSocket URI.

required
message_type type[T]

Pydantic model class for incoming messages.

required
strict bool

If True, raise ValidationError on invalid messages. If False, skip invalid messages with warning.

True
**kwargs Any

Additional kwargs passed to WebSocketManager.

{}

Raises:

Type Description
ImportError

If pydantic is not installed.

Source code in src/jetsocket/typed.py
class TypedWebSocket(WebSocket[T], Generic[T]):
    """WebSocket with typed messages via Pydantic.

    Provides automatic serialization/deserialization of messages
    using Pydantic models. Messages are validated on receive and
    serialized on send.

    Example:
        >>> from pydantic import BaseModel
        >>>
        >>> class TradeMessage(BaseModel):
        ...     symbol: str
        ...     price: float
        ...     quantity: float
        >>>
        >>> async with TypedWebSocket(
        ...     "wss://stream.example.com/ws", TradeMessage
        ... ) as ws:
        ...     async for trade in ws:  # trade is TradeMessage
        ...         print(f"{trade.symbol}: {trade.price}")

    Args:
        uri: WebSocket URI.
        message_type: Pydantic model class for incoming messages.
        strict: If True, raise ValidationError on invalid messages.
            If False, skip invalid messages with warning.
        **kwargs: Additional kwargs passed to WebSocketManager.

    Raises:
        ImportError: If pydantic is not installed.
    """

    def __init__(
        self,
        uri: str,
        message_type: type[T],
        *,
        strict: bool = True,
        **kwargs: Any,
    ) -> None:
        """Initialize typed WebSocket.

        Args:
            uri: WebSocket URI.
            message_type: Pydantic model class for incoming messages.
            strict: If True, raise ValidationError on invalid messages.
            **kwargs: Additional kwargs passed to WebSocketManager.

        Raises:
            ImportError: If pydantic is not installed.
        """
        try:
            from pydantic import BaseModel  # noqa: PLC0415
        except ImportError as e:
            raise ImportError(
                "pydantic is required for TypedWebSocket. "
                "Install it with: pip install pydantic"
            ) from e

        self._strict = strict
        self._validation_errors = 0

        # Set up deserializer
        def deserializer(data: bytes) -> T:
            return message_type.model_validate_json(data)  # type: ignore[no-any-return]

        # Set up serializer
        def serializer(msg: Any) -> bytes:
            if isinstance(msg, BaseModel):
                return msg.model_dump_json().encode("utf-8")  # type: ignore[no-any-return]
            # Allow raw dict/Any for flexibility
            return json.dumps(msg).encode("utf-8")

        super().__init__(
            uri,
            message_type=message_type,
            deserializer=deserializer,
            serializer=serializer,
            **kwargs,
        )

    @property
    def message_type(self) -> type[T]:
        """Get the message type class."""
        return self._message_type  # type: ignore[return-value]

    @property
    def strict(self) -> bool:
        """Check if strict mode is enabled."""
        return self._strict

    @property
    def validation_errors(self) -> int:
        """Get the count of validation errors (non-strict mode only)."""
        return self._validation_errors

message_type property

message_type: type[T]

Get the message type class.

validation_errors property

validation_errors: int

Get the count of validation errors (non-strict mode only).

__init__

__init__(
    uri: str,
    message_type: type[T],
    *,
    strict: bool = True,
    **kwargs: Any,
) -> None

Initialize typed WebSocket.

Parameters:

Name Type Description Default
uri str

WebSocket URI.

required
message_type type[T]

Pydantic model class for incoming messages.

required
strict bool

If True, raise ValidationError on invalid messages.

True
**kwargs Any

Additional kwargs passed to WebSocketManager.

{}

Raises:

Type Description
ImportError

If pydantic is not installed.

Source code in src/jetsocket/typed.py
def __init__(
    self,
    uri: str,
    message_type: type[T],
    *,
    strict: bool = True,
    **kwargs: Any,
) -> None:
    """Initialize typed WebSocket.

    Args:
        uri: WebSocket URI.
        message_type: Pydantic model class for incoming messages.
        strict: If True, raise ValidationError on invalid messages.
        **kwargs: Additional kwargs passed to WebSocketManager.

    Raises:
        ImportError: If pydantic is not installed.
    """
    try:
        from pydantic import BaseModel  # noqa: PLC0415
    except ImportError as e:
        raise ImportError(
            "pydantic is required for TypedWebSocket. "
            "Install it with: pip install pydantic"
        ) from e

    self._strict = strict
    self._validation_errors = 0

    # Set up deserializer
    def deserializer(data: bytes) -> T:
        return message_type.model_validate_json(data)  # type: ignore[no-any-return]

    # Set up serializer
    def serializer(msg: Any) -> bytes:
        if isinstance(msg, BaseModel):
            return msg.model_dump_json().encode("utf-8")  # type: ignore[no-any-return]
        # Allow raw dict/Any for flexibility
        return json.dumps(msg).encode("utf-8")

    super().__init__(
        uri,
        message_type=message_type,
        deserializer=deserializer,
        serializer=serializer,
        **kwargs,
    )

Installation

Typed messages require Pydantic:

pip install jetsocket[pydantic]
# or
pip install pydantic

Usage

from pydantic import BaseModel
from jetsocket import WebSocket

class TradeMessage(BaseModel):
    symbol: str
    price: float
    quantity: float

async with WebSocket("wss://stream.example.com/ws", message_type=TradeMessage) as ws:
    async for trade in ws:  # trade: TradeMessage
        print(f"{trade.symbol}: ${trade.price:.2f}")

Validation Modes

Strict Mode (default)

Raises ValidationError on invalid messages:

ws = WebSocket("wss://example.com/ws", message_type=TradeMessage, strict=True)

Non-Strict Mode

Skips invalid messages with a warning:

ws = WebSocket("wss://example.com/ws", message_type=TradeMessage, strict=False)

# Check error count
print(f"Validation errors: {ws.validation_errors}")

Sending Typed Messages

Pydantic models are automatically serialized:

class SubscribeRequest(BaseModel):
    method: str
    params: list[str]

request = SubscribeRequest(method="SUBSCRIBE", params=["btcusdt@trade"])
await ws.send(request)  # Serialized to JSON

Field Aliases

Use Field(alias=...) for APIs with different naming conventions:

from pydantic import BaseModel, Field

class BinanceTrade(BaseModel):
    event_type: str = Field(alias="e")
    symbol: str = Field(alias="s")
    price: str = Field(alias="p")
    quantity: str = Field(alias="q")

Type Checking Benefits

With WebSocket(message_type=...), your IDE provides:

  • Autocomplete for message fields
  • Type error detection
  • Documentation on hover
async for trade in ws:
    trade.symbol    # Autocomplete works
    trade.invalid   # Type error detected