Skip to content

Configuration

JetSocket uses dataclasses for type-safe configuration.

BackoffConfig

Controls reconnection backoff behavior.

Configuration for exponential backoff.

Attributes:

Name Type Description
base float

Initial delay in seconds.

multiplier float

Exponential multiplier applied each attempt.

cap float

Maximum delay in seconds.

jitter bool

Whether to add random jitter (recommended).

max_attempts int

Maximum number of retry attempts (0 = infinite).

reset_after float

Reset attempt counter after this many seconds connected.

Source code in src/jetsocket/backoff.py
@dataclass(frozen=True, slots=True)
class BackoffConfig:
    """Configuration for exponential backoff.

    Attributes:
        base: Initial delay in seconds.
        multiplier: Exponential multiplier applied each attempt.
        cap: Maximum delay in seconds.
        jitter: Whether to add random jitter (recommended).
        max_attempts: Maximum number of retry attempts (0 = infinite).
        reset_after: Reset attempt counter after this many seconds connected.
    """

    base: float = 1.0
    multiplier: float = 2.0
    cap: float = 60.0
    jitter: bool = True
    max_attempts: int = 0
    reset_after: float = 60.0

    def __post_init__(self) -> None:
        """Validate configuration values."""
        if self.base <= 0:
            msg = "base must be positive"
            raise ValueError(msg)
        if self.multiplier < 1:
            msg = "multiplier must be >= 1"
            raise ValueError(msg)
        if self.cap < self.base:
            msg = "cap must be >= base"
            raise ValueError(msg)
        if self.max_attempts < 0:
            msg = "max_attempts must be >= 0"
            raise ValueError(msg)
        if self.reset_after < 0:
            msg = "reset_after must be >= 0"
            raise ValueError(msg)

__post_init__

__post_init__() -> None

Validate configuration values.

Source code in src/jetsocket/backoff.py
def __post_init__(self) -> None:
    """Validate configuration values."""
    if self.base <= 0:
        msg = "base must be positive"
        raise ValueError(msg)
    if self.multiplier < 1:
        msg = "multiplier must be >= 1"
        raise ValueError(msg)
    if self.cap < self.base:
        msg = "cap must be >= base"
        raise ValueError(msg)
    if self.max_attempts < 0:
        msg = "max_attempts must be >= 0"
        raise ValueError(msg)
    if self.reset_after < 0:
        msg = "reset_after must be >= 0"
        raise ValueError(msg)

Parameters

Parameter Type Default Description
base float 1.0 Initial delay in seconds
multiplier float 2.0 Delay multiplier per attempt
cap float 60.0 Maximum delay in seconds
jitter bool True Add random jitter to prevent thundering herd
max_attempts int 0 Maximum attempts (0 = infinite)
reset_after float 60.0 Reset backoff after N seconds of successful connection

Example

from jetsocket import BackoffConfig

# Aggressive reconnect for trading
backoff = BackoffConfig(
    base=0.5,
    multiplier=2.0,
    cap=30.0,
    jitter=True,
    max_attempts=0,  # Never give up
)

# Quick fail for user-facing apps
backoff = BackoffConfig(
    base=1.0,
    multiplier=2.0,
    cap=10.0,
    max_attempts=5,  # Give up after 5 attempts
)

HeartbeatConfig

Controls ping/pong heartbeat behavior.

Configuration for heartbeat behavior.

Parameters:

Name Type Description Default
interval float

Seconds between ping frames.

30.0
timeout float

Seconds to wait for pong before considering connection dead.

10.0
payload bytes

Custom payload for ping frames.

b''
use_ws_ping bool

If True, use WebSocket-level PING frames (RFC 6455).

True
application_ping Callable[[], bytes | str | dict[str, Any]] | None

Custom function to generate application-level ping.

None
pong_matcher Callable[[Any], bool] | None

Custom function to detect application-level pong responses.

None
Source code in src/jetsocket/heartbeat.py
@dataclass(frozen=True, slots=True)
class HeartbeatConfig:
    """Configuration for heartbeat behavior.

    Args:
        interval: Seconds between ping frames.
        timeout: Seconds to wait for pong before considering connection dead.
        payload: Custom payload for ping frames.
        use_ws_ping: If True, use WebSocket-level PING frames (RFC 6455).
        application_ping: Custom function to generate application-level ping.
        pong_matcher: Custom function to detect application-level pong responses.
    """

    interval: float = 30.0
    timeout: float = 10.0
    payload: bytes = b""
    use_ws_ping: bool = True
    application_ping: Callable[[], bytes | str | dict[str, Any]] | None = None
    pong_matcher: Callable[[Any], bool] | None = None

    def __post_init__(self) -> None:
        """Validate configuration."""
        if self.interval <= 0:
            msg = f"interval must be positive, got {self.interval}"
            raise ValueError(msg)
        if self.timeout <= 0:
            msg = f"timeout must be positive, got {self.timeout}"
            raise ValueError(msg)
        if self.timeout >= self.interval:
            msg = (
                f"timeout ({self.timeout}) must be less than interval ({self.interval})"
            )
            raise ValueError(msg)
        if self.application_ping is not None and self.use_ws_ping:
            msg = "cannot use both use_ws_ping=True and application_ping"
            raise ValueError(msg)

__post_init__

__post_init__() -> None

Validate configuration.

Source code in src/jetsocket/heartbeat.py
def __post_init__(self) -> None:
    """Validate configuration."""
    if self.interval <= 0:
        msg = f"interval must be positive, got {self.interval}"
        raise ValueError(msg)
    if self.timeout <= 0:
        msg = f"timeout must be positive, got {self.timeout}"
        raise ValueError(msg)
    if self.timeout >= self.interval:
        msg = (
            f"timeout ({self.timeout}) must be less than interval ({self.interval})"
        )
        raise ValueError(msg)
    if self.application_ping is not None and self.use_ws_ping:
        msg = "cannot use both use_ws_ping=True and application_ping"
        raise ValueError(msg)

Parameters

Parameter Type Default Description
interval float 30.0 Send ping every N seconds
timeout float 10.0 Wait N seconds for pong
payload bytes None Custom ping payload
use_ws_ping bool True Use WebSocket ping frames
use_application_ping bool False Send as application message
pong_matcher Callable None Function to detect pong in messages

Example

from jetsocket import HeartbeatConfig

# Standard WebSocket ping
heartbeat = HeartbeatConfig(interval=20.0, timeout=10.0)

# Application-level ping (e.g., Bybit)
heartbeat = HeartbeatConfig(
    interval=20.0,
    timeout=10.0,
    payload=b'{"op": "ping"}',
    use_ws_ping=False,
    use_application_ping=True,
    pong_matcher=lambda msg: msg.get("op") == "pong",
)

BufferConfig

Controls message buffering behavior.

Configuration for message buffering.

Parameters:

Name Type Description Default
capacity int

Maximum number of messages to buffer.

1000
overflow_policy OverflowPolicyType

Policy when buffer is full. - 'drop_oldest': Remove oldest message (FIFO eviction). - 'drop_newest': Discard incoming message. - 'error': Raise BufferOverflowError.

'drop_oldest'
enable_dedup bool

Whether to deduplicate messages by sequence ID.

False
dedup_window int

Number of recent sequence IDs to track for deduplication.

100
Source code in src/jetsocket/buffer.py
@dataclass(frozen=True, slots=True)
class BufferConfig:
    """Configuration for message buffering.

    Args:
        capacity: Maximum number of messages to buffer.
        overflow_policy: Policy when buffer is full.
            - 'drop_oldest': Remove oldest message (FIFO eviction).
            - 'drop_newest': Discard incoming message.
            - 'error': Raise BufferOverflowError.
        enable_dedup: Whether to deduplicate messages by sequence ID.
        dedup_window: Number of recent sequence IDs to track for deduplication.
    """

    capacity: int = 1000
    overflow_policy: OverflowPolicyType = "drop_oldest"
    enable_dedup: bool = False
    dedup_window: int = 100

    def __post_init__(self) -> None:
        """Validate configuration."""
        if self.capacity <= 0:
            msg = f"capacity must be positive, got {self.capacity}"
            raise ValueError(msg)
        if self.overflow_policy not in ("drop_oldest", "drop_newest", "error"):
            msg = f"Invalid overflow_policy: {self.overflow_policy}"
            raise ValueError(msg)
        if self.dedup_window <= 0:
            msg = f"dedup_window must be positive, got {self.dedup_window}"
            raise ValueError(msg)

__post_init__

__post_init__() -> None

Validate configuration.

Source code in src/jetsocket/buffer.py
def __post_init__(self) -> None:
    """Validate configuration."""
    if self.capacity <= 0:
        msg = f"capacity must be positive, got {self.capacity}"
        raise ValueError(msg)
    if self.overflow_policy not in ("drop_oldest", "drop_newest", "error"):
        msg = f"Invalid overflow_policy: {self.overflow_policy}"
        raise ValueError(msg)
    if self.dedup_window <= 0:
        msg = f"dedup_window must be positive, got {self.dedup_window}"
        raise ValueError(msg)

Parameters

Parameter Type Default Description
capacity int 1000 Maximum messages to buffer
overflow_policy str "drop_oldest" Policy when full: "drop_oldest", "drop_newest", "error"
enable_dedup bool False Enable deduplication
dedup_window int 100 Track last N sequence IDs

Example

from jetsocket import BufferConfig

# Trading: large buffer with dedup
buffer = BufferConfig(
    capacity=10000,
    overflow_policy="drop_oldest",
    enable_dedup=True,
    dedup_window=1000,
)

# Dashboard: small buffer, drop stale
buffer = BufferConfig(
    capacity=100,
    overflow_policy="drop_oldest",
)

ReplayConfig

Controls replay-on-reconnect behavior.

Configuration for message replay on reconnection.

Parameters:

Name Type Description Default
mode ReplayModeType

Replay strategy. - 'none': No replay. - 'sequence_id': Call on_replay callback with last sequence ID. - 'full_buffer': Resend all buffered messages.

'none'
sequence_extractor Callable[[Any], SequenceId] | None

Function to extract sequence ID from a message.

None
on_replay Callable[[SequenceId], Awaitable[None]] | None

Async callback invoked with last sequence ID (for sequence_id mode).

None
Source code in src/jetsocket/buffer.py
@dataclass(frozen=True, slots=True)
class ReplayConfig:
    """Configuration for message replay on reconnection.

    Args:
        mode: Replay strategy.
            - 'none': No replay.
            - 'sequence_id': Call on_replay callback with last sequence ID.
            - 'full_buffer': Resend all buffered messages.
        sequence_extractor: Function to extract sequence ID from a message.
        on_replay: Async callback invoked with last sequence ID (for sequence_id mode).
    """

    mode: ReplayModeType = "none"
    sequence_extractor: Callable[[Any], SequenceId] | None = None
    on_replay: Callable[[SequenceId], Awaitable[None]] | None = None

    def __post_init__(self) -> None:
        """Validate configuration."""
        if self.mode not in ("none", "sequence_id", "full_buffer"):
            msg = f"Invalid replay mode: {self.mode}"
            raise ValueError(msg)
        if self.mode == "sequence_id" and self.on_replay is None:
            msg = "on_replay callback is required for sequence_id mode"
            raise ValueError(msg)

__post_init__

__post_init__() -> None

Validate configuration.

Source code in src/jetsocket/buffer.py
def __post_init__(self) -> None:
    """Validate configuration."""
    if self.mode not in ("none", "sequence_id", "full_buffer"):
        msg = f"Invalid replay mode: {self.mode}"
        raise ValueError(msg)
    if self.mode == "sequence_id" and self.on_replay is None:
        msg = "on_replay callback is required for sequence_id mode"
        raise ValueError(msg)

Parameters

Parameter Type Default Description
mode str "none" Replay mode: "none", "sequence_id", "full_buffer"
sequence_extractor Callable None Function to extract sequence ID from message
on_replay Callable None Callback when replay is needed

Example

from jetsocket import ReplayConfig

async def request_replay(last_seq):
    await ws.send({"action": "replay", "from": last_seq})

replay = ReplayConfig(
    mode="sequence_id",
    sequence_extractor=lambda msg: msg.get("seq"),
    on_replay=request_replay,
)

ConnectionPoolConfig

See ConnectionPool API reference for connection pool configuration.