Skip to content

Typed Messages

JetSocket supports type-safe message handling with Pydantic models.

Installation

Pydantic is an optional dependency:

pip install jetsocket[pydantic]
# or
pip install pydantic

WebSocket with message_type

Pass a Pydantic model via message_type= for automatic validation and type hints:

from pydantic import BaseModel
from jetsocket import WebSocket

class TradeMessage(BaseModel):
    symbol: str
    price: float
    quantity: float

async with WebSocket("wss://stream.example.com/ws", message_type=TradeMessage) as ws:
    async for trade in ws:  # trade: TradeMessage
        print(f"{trade.symbol}: ${trade.price:.2f} x {trade.quantity}")

Defining Message Types

Use Pydantic models to define your message schemas:

from pydantic import BaseModel, Field
from datetime import datetime

class OrderUpdate(BaseModel):
    order_id: str = Field(alias="orderId")
    status: str
    filled_qty: float = Field(alias="filledQty")
    timestamp: datetime

Field Aliases

Many APIs use different naming conventions. Use Field(alias=...):

class BinanceTrade(BaseModel):
    event_type: str = Field(alias="e")
    event_time: int = Field(alias="E")
    symbol: str = Field(alias="s")
    trade_id: int = Field(alias="t")
    price: str = Field(alias="p")
    quantity: str = Field(alias="q")

Nested Models

Complex message structures work automatically:

class MarketData(BaseModel):
    bid: PriceLevel
    ask: PriceLevel
    last_trade: TradeInfo

class PriceLevel(BaseModel):
    price: float
    quantity: float

class TradeInfo(BaseModel):
    price: float
    quantity: float
    timestamp: int

Validation Modes

Strict Mode (default)

Raises ValidationError on invalid messages:

ws = WebSocket("wss://example.com/ws", message_type=TradeMessage, strict=True)

Non-Strict Mode

Logs warnings and skips invalid messages:

ws = WebSocket("wss://example.com/ws", message_type=TradeMessage, strict=False)

# Check validation error count
print(f"Validation errors: {ws.validation_errors}")

Sending Typed Messages

You can send Pydantic models directly:

class SubscribeRequest(BaseModel):
    method: str
    params: list[str]

request = SubscribeRequest(method="SUBSCRIBE", params=["btcusdt@trade"])
await ws.send(request)  # Automatically serialized to JSON

Combining with Configuration

# Create a typed trading client
ws = WebSocket(
    "wss://stream.binance.com/ws",
    message_type=TradeMessage,
    reconnect=True,
    heartbeat=HeartbeatConfig(interval=20.0),
    buffer=BufferConfig(capacity=10000),
)

Full Example

import asyncio
from pydantic import BaseModel, Field
from jetsocket import WebSocket, HeartbeatConfig

class BinanceTrade(BaseModel):
    event_type: str = Field(alias="e")
    symbol: str = Field(alias="s")
    price: str = Field(alias="p")
    quantity: str = Field(alias="q")
    trade_time: int = Field(alias="T")
    is_buyer_maker: bool = Field(alias="m")

async def main():
    async with WebSocket(
        "wss://stream.binance.com:9443/ws/btcusdt@trade",
        message_type=BinanceTrade,
        heartbeat=HeartbeatConfig(interval=20.0),
    ) as ws:
        async for trade in ws:
            side = "SELL" if trade.is_buyer_maker else "BUY"
            print(f"{trade.symbol} {side}: {trade.price} x {trade.quantity}")

if __name__ == "__main__":
    asyncio.run(main())

Type Checking Benefits

With WebSocket(message_type=...), your IDE provides:

  • Autocomplete for message fields
  • Type error detection
  • Documentation on hover
async for trade in ws:
    trade.symbol    # Autocomplete works
    trade.invalid   # Type error detected