Message Buffering¶
JetSocket can buffer received messages with sequence tracking and deduplication.
Basic Configuration¶
from jetsocket import WebSocket, BufferConfig
ws = WebSocket(
"wss://example.com/ws",
buffer=BufferConfig(
capacity=1000, # Max messages to buffer
overflow_policy="drop_oldest", # Policy when full
),
)
Overflow Policies¶
| Policy | Behavior |
|---|---|
drop_oldest |
Remove oldest message, add new (FIFO eviction) |
drop_newest |
Discard incoming message |
error |
Raise BufferOverflowError |
# Discard new messages when full
buffer = BufferConfig(
capacity=100,
overflow_policy="drop_newest",
)
Deduplication¶
Enable deduplication to prevent duplicate messages:
buffer = BufferConfig(
capacity=1000,
enable_dedup=True,
dedup_window=100, # Track last 100 sequence IDs
)
Replay on Reconnect¶
JetSocket can replay messages after reconnection:
from jetsocket import WebSocket, BufferConfig, ReplayConfig
ws = WebSocket(
"wss://example.com/ws",
buffer=BufferConfig(capacity=1000),
replay=ReplayConfig(
mode="sequence_id",
sequence_extractor=lambda msg: msg.get("seq"),
on_replay=on_replay_callback,
),
)
async def on_replay_callback(last_seq):
# Request missed messages from server
await ws.send({"action": "replay", "from": last_seq})
Replay Modes¶
| Mode | Description |
|---|---|
none |
No replay (default) |
sequence_id |
Call on_replay with last sequence ID |
full_buffer |
Resend all buffered messages |
Buffer Statistics¶
# Access buffer directly
buffer = ws.buffer
if buffer:
print(f"Messages in buffer: {len(buffer)}")
print(f"Fill ratio: {buffer.fill_ratio:.1%}")
print(f"Dropped messages: {buffer.total_dropped}")
# From connection stats
stats = ws.stats()
print(f"Buffer fill: {stats.buffer_fill_ratio:.1%}")
Buffer Events¶
@ws.on("buffer_overflow")
async def on_overflow(event):
print(f"Buffer overflow: dropped {event.dropped_count} messages")
print(f"Policy: {event.policy}")
@ws.on("replay_started")
async def on_replay_start(event):
print(f"Replay started: mode={event.mode}, messages={event.message_count}")
@ws.on("replay_completed")
async def on_replay_done(event):
print(f"Replay completed: {event.replayed_count} messages in {event.duration_ms}ms")
Recommended Settings¶
| Use Case | Capacity | Policy |
|---|---|---|
| Trading | 10,000 | drop_oldest + dedup |
| LLM Streaming | 500 | drop_oldest |
| Dashboard | 100 | drop_oldest |