Skip to content

Connection Pool

JetSocket provides connection pooling for managing multiple WebSocket connections efficiently.

Basic Usage

from jetsocket import ConnectionPool, ConnectionPoolConfig

config = ConnectionPoolConfig(
    max_connections=10,
    max_idle_time=300.0,      # Close idle connections after 5 min
    health_check_interval=30.0,
    acquire_timeout=10.0,
)

async with ConnectionPool(config, base_uri="wss://stream.example.com") as pool:
    async with pool.acquire("/ws/btcusdt") as conn:
        await conn.send({"subscribe": "trades"})
        async for msg in conn:
            process(msg)

Configuration Options

Option Default Description
max_connections 10 Maximum concurrent connections
max_idle_time 300.0 Close idle connections after N seconds
health_check_interval 30.0 Interval between health checks
acquire_timeout 10.0 Timeout for acquiring a connection

Acquiring Connections

# With path (appended to base_uri)
async with pool.acquire("/ws/btcusdt") as conn:
    ...

# With full URI
async with pool.acquire("wss://stream.example.com/ws/btcusdt") as conn:
    ...

# With timeout
async with pool.acquire("/ws/btcusdt", timeout=5.0) as conn:
    ...

Connection Reuse

Connections are reused when released back to the pool:

async with pool.acquire("/ws/btcusdt") as conn1:
    # Use connection
    pass

# Connection returned to pool

async with pool.acquire("/ws/btcusdt") as conn2:
    # May be the same physical connection as conn1
    pass

Pool Statistics

stats = pool.stats()
print(f"Total connections: {stats.total_connections}")
print(f"Active: {stats.active_connections}")
print(f"Idle: {stats.idle_connections}")
print(f"Messages sent: {stats.total_messages_sent}")
print(f"Messages received: {stats.total_messages_received}")

Health Checks

The pool automatically performs health checks on idle connections:

  • Connections that fail health checks are removed
  • New connections are created on demand
  • Health check interval is configurable

Pool Exhaustion

If all connections are in use:

try:
    async with pool.acquire("/ws/btcusdt", timeout=5.0) as conn:
        ...
except PoolExhaustedError as e:
    print(f"Pool exhausted: max={e.max_connections}, timeout={e.timeout}s")

Custom WebSocket Configuration

Pass additional WebSocket options:

pool = ConnectionPool(
    config,
    base_uri="wss://stream.example.com",
    manager_kwargs={
        "reconnect": True,
        "heartbeat": HeartbeatConfig(interval=20.0),
    },
)

Full Example

import asyncio
from jetsocket import ConnectionPool, ConnectionPoolConfig, HeartbeatConfig

async def process_stream(pool, symbol):
    async with pool.acquire(f"/ws/{symbol}") as conn:
        await conn.send({"subscribe": "trades"})
        async for msg in conn:
            print(f"{symbol}: {msg}")

async def main():
    config = ConnectionPoolConfig(max_connections=10)

    async with ConnectionPool(
        config,
        base_uri="wss://stream.example.com",
        manager_kwargs={
            "heartbeat": HeartbeatConfig(interval=20.0),
        },
    ) as pool:
        # Process multiple streams concurrently
        await asyncio.gather(
            process_stream(pool, "btcusdt"),
            process_stream(pool, "ethusdt"),
            process_stream(pool, "solusdt"),
        )

if __name__ == "__main__":
    asyncio.run(main())