Skip to content

ConnectionPool

Connection pool for managing multiple WebSocket connections efficiently.

Bases: Generic[T]

Pool of WebSocket instances.

Manages multiple WebSocket connections with semaphore-based limiting, connection reuse, health checking, and idle cleanup.

Example

async with ConnectionPool(base_uri="wss://stream.example.com") as pool: ... async with pool.acquire("/ws/btcusdt") as conn: ... await conn.send({"subscribe": "trades"}) ... async for msg in conn: ... process(msg)

Parameters:

Name Type Description Default
config ConnectionPoolConfig | None

Pool configuration. Uses defaults if not provided.

None
base_uri str | None

Base URI for relative paths. Required for relative paths.

None
manager_kwargs dict[str, Any] | None

Additional kwargs to pass to WebSocket.

None
Source code in src/jetsocket/pool.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
class ConnectionPool(Generic[T]):
    """Pool of WebSocket instances.

    Manages multiple WebSocket connections with semaphore-based limiting,
    connection reuse, health checking, and idle cleanup.

    Example:
        >>> async with ConnectionPool(base_uri="wss://stream.example.com") as pool:
        ...     async with pool.acquire("/ws/btcusdt") as conn:
        ...         await conn.send({"subscribe": "trades"})
        ...         async for msg in conn:
        ...             process(msg)

    Args:
        config: Pool configuration. Uses defaults if not provided.
        base_uri: Base URI for relative paths. Required for relative paths.
        manager_kwargs: Additional kwargs to pass to WebSocket.
    """

    def __init__(
        self,
        config: ConnectionPoolConfig | None = None,
        *,
        base_uri: str | None = None,
        manager_kwargs: dict[str, Any] | None = None,
    ) -> None:
        """Initialize the connection pool."""
        self._config = config or ConnectionPoolConfig()
        self._base_uri = base_uri
        self._manager_kwargs = manager_kwargs or {}

        # Connection storage by URI
        self._connections: dict[str, list[_PooledConnectionState]] = {}
        self._connection_count = 0

        # Semaphore for limiting concurrent connections
        self._semaphore = asyncio.Semaphore(self._config.max_connections)

        # Pool state
        self._closed = False
        self._lock = asyncio.Lock()

        # Background tasks
        self._health_check_task: asyncio.Task[None] | None = None
        self._cleanup_task: asyncio.Task[None] | None = None

    @property
    def is_closed(self) -> bool:
        """Return True if the pool has been closed."""
        return self._closed

    async def acquire(
        self,
        uri_or_path: str,
        timeout: float | None = None,
    ) -> PooledConnection[T]:
        """Acquire a connection from the pool.

        If a matching idle connection exists, it will be reused.
        Otherwise, a new connection will be created.

        Args:
            uri_or_path: Full URI or path relative to base_uri.
            timeout: Timeout for acquiring. Uses config default if not provided.

        Returns:
            A PooledConnection wrapper.

        Raises:
            PoolClosedError: If the pool has been closed.
            PoolExhaustedError: If no connection available within timeout.
            ConnectionError: If connection fails.
        """
        if self._closed:
            raise PoolClosedError("Pool has been closed")

        timeout = timeout if timeout is not None else self._config.acquire_timeout

        # Resolve full URI
        full_uri = self._resolve_uri(uri_or_path)

        # Try to acquire semaphore with timeout
        try:
            acquired = await asyncio.wait_for(
                self._acquire_semaphore(),
                timeout=timeout,
            )
            if not acquired:
                raise PoolExhaustedError(
                    f"Could not acquire connection within {timeout}s",
                    max_connections=self._config.max_connections,
                    timeout=timeout,
                )
        except asyncio.TimeoutError:
            raise PoolExhaustedError(
                f"Could not acquire connection within {timeout}s",
                max_connections=self._config.max_connections,
                timeout=timeout,
            ) from None

        try:
            # Try to get an existing idle connection
            async with self._lock:
                conn_state = self._get_idle_connection(full_uri)

            if conn_state is not None:
                # Verify connection is still usable
                if conn_state.manager.state in (
                    ConnectionState.CONNECTED,
                    ConnectionState.RECONNECTING,
                    ConnectionState.BACKING_OFF,
                ):
                    conn_state.mark_used()
                    return PooledConnection[T](self, conn_state)
                else:
                    # Connection is dead, remove it
                    async with self._lock:
                        self._remove_connection(conn_state)

            # Create new connection
            conn_state = await self._create_connection(full_uri)
            conn_state.mark_used()
            return PooledConnection[T](self, conn_state)

        except Exception:
            # Release semaphore on failure
            self._semaphore.release()
            raise

    async def _acquire_semaphore(self) -> bool:
        """Acquire the semaphore."""
        await self._semaphore.acquire()
        return True

    async def release(self, conn: PooledConnection[T]) -> None:
        """Release a connection back to the pool.

        Args:
            conn: The pooled connection to release.
        """
        state = conn._state
        state.mark_released()

        # Release semaphore
        self._semaphore.release()

        # If pool is closed or connection is dead, close it
        if self._closed or state.manager.state.is_terminal:
            await self._close_connection(state)
            async with self._lock:
                self._remove_connection(state)

    def stats(self) -> PoolStats:
        """Get aggregated pool statistics.

        Returns:
            A PoolStats instance with current statistics.
        """
        active = 0
        idle = 0
        total_sent = 0
        total_received = 0
        total_reconnects = 0

        for conn_list in self._connections.values():
            for conn_state in conn_list:
                if conn_state.in_use:
                    active += 1
                else:
                    idle += 1

                stats = conn_state.manager.stats()
                total_sent += stats.messages_sent
                total_received += stats.messages_received
                total_reconnects += stats.reconnect_count

        return PoolStats(
            total_connections=self._connection_count,
            active_connections=active,
            idle_connections=idle,
            total_messages_sent=total_sent,
            total_messages_received=total_received,
            total_reconnects=total_reconnects,
        )

    async def close(self) -> None:
        """Close the pool and all connections.

        After closing, no new connections can be acquired.
        """
        self._closed = True

        # Cancel background tasks
        if self._health_check_task is not None:
            self._health_check_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._health_check_task
            self._health_check_task = None

        if self._cleanup_task is not None:
            self._cleanup_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._cleanup_task
            self._cleanup_task = None

        # Close all connections
        async with self._lock:
            close_tasks: list[asyncio.Task[None]] = []
            for conn_list in self._connections.values():
                for conn_state in conn_list:
                    task = asyncio.create_task(self._close_connection(conn_state))
                    close_tasks.append(task)

            if close_tasks:
                await asyncio.gather(*close_tasks, return_exceptions=True)

            self._connections.clear()

    async def __aenter__(self) -> ConnectionPool[T]:
        """Async context manager entry - starts background tasks."""
        # Start background health check
        self._health_check_task = asyncio.create_task(
            self._health_check_loop(),
            name="jetsocket-pool-health-check",
        )

        # Start background cleanup
        self._cleanup_task = asyncio.create_task(
            self._cleanup_loop(),
            name="jetsocket-pool-cleanup",
        )

        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Async context manager exit - closes pool."""
        await self.close()

    # -------------------------------------------------------------------------
    # Internal Methods
    # -------------------------------------------------------------------------

    def _resolve_uri(self, uri_or_path: str) -> str:
        """Resolve a URI or path to a full URI.

        Args:
            uri_or_path: Full URI or relative path.

        Returns:
            Full URI string.

        Raises:
            ValueError: If relative path without base_uri.
        """
        parsed = urlparse(uri_or_path)
        if parsed.scheme:
            # Already a full URI
            return uri_or_path

        if self._base_uri is None:
            msg = "Relative path requires base_uri to be set"
            raise ValueError(msg)

        return urljoin(self._base_uri, uri_or_path)

    def _get_idle_connection(self, uri: str) -> _PooledConnectionState | None:
        """Get an idle connection for the given URI.

        Must be called with _lock held.

        Args:
            uri: The connection URI.

        Returns:
            An idle connection state, or None if none available.
        """
        if uri not in self._connections:
            return None

        for conn_state in self._connections[uri]:
            if not conn_state.in_use:
                return conn_state

        return None

    async def _create_connection(self, uri: str) -> _PooledConnectionState:
        """Create a new connection.

        Args:
            uri: The connection URI.

        Returns:
            The new connection state.
        """
        manager: WebSocket[T] = WebSocket(uri, **self._manager_kwargs)
        await manager.connect()

        state = _PooledConnectionState(manager=manager, uri=uri)

        async with self._lock:
            if uri not in self._connections:
                self._connections[uri] = []
            self._connections[uri].append(state)
            self._connection_count += 1

        return state

    async def _close_connection(self, state: _PooledConnectionState) -> None:
        """Close a connection.

        Args:
            state: The connection state to close.
        """
        with contextlib.suppress(Exception):
            await state.manager.close()

    def _remove_connection(self, state: _PooledConnectionState) -> None:
        """Remove a connection from the pool.

        Must be called with _lock held.

        Args:
            state: The connection state to remove.
        """
        uri = state.uri
        if uri in self._connections:
            try:
                self._connections[uri].remove(state)
                if not self._connections[uri]:
                    del self._connections[uri]
            except ValueError:
                pass

    async def _health_check_loop(self) -> None:
        """Background health check loop."""
        while not self._closed:
            try:
                await asyncio.sleep(self._config.health_check_interval)

                if self._closed:
                    break

                # Check all connections
                async with self._lock:
                    to_close: list[_PooledConnectionState] = []

                    for conn_list in self._connections.values():
                        for conn_state in conn_list:
                            # Skip in-use connections
                            if conn_state.in_use:
                                continue

                            # Check if connection is dead
                            if conn_state.manager.state.is_terminal:
                                to_close.append(conn_state)

                # Close dead connections (outside lock)
                for state in to_close:
                    await self._close_connection(state)
                    async with self._lock:
                        self._remove_connection(state)

            except asyncio.CancelledError:
                break
            except Exception:
                pass

    async def _cleanup_loop(self) -> None:
        """Background cleanup loop for idle connections."""
        while not self._closed:
            try:
                # Check every 30 seconds
                await asyncio.sleep(30.0)

                if self._closed:
                    break

                to_close: list[_PooledConnectionState] = []

                async with self._lock:
                    for conn_list in self._connections.values():
                        for conn_state in conn_list:
                            # Skip in-use connections
                            if conn_state.in_use:
                                continue

                            # Check if idle too long
                            if conn_state.idle_time > self._config.max_idle_time:
                                to_close.append(conn_state)

                # Close idle connections (outside lock)
                for state in to_close:
                    await self._close_connection(state)
                    async with self._lock:
                        self._remove_connection(state)

            except asyncio.CancelledError:
                break
            except Exception:
                pass

    def __repr__(self) -> str:
        """Return string representation."""
        stats = self.stats()
        return (
            f"ConnectionPool(max_connections={self._config.max_connections}, "
            f"active={stats.active_connections}, idle={stats.idle_connections}, "
            f"closed={self._closed})"
        )

__init__

__init__(
    config: ConnectionPoolConfig | None = None,
    *,
    base_uri: str | None = None,
    manager_kwargs: dict[str, Any] | None = None,
) -> None

Initialize the connection pool.

Source code in src/jetsocket/pool.py
def __init__(
    self,
    config: ConnectionPoolConfig | None = None,
    *,
    base_uri: str | None = None,
    manager_kwargs: dict[str, Any] | None = None,
) -> None:
    """Initialize the connection pool."""
    self._config = config or ConnectionPoolConfig()
    self._base_uri = base_uri
    self._manager_kwargs = manager_kwargs or {}

    # Connection storage by URI
    self._connections: dict[str, list[_PooledConnectionState]] = {}
    self._connection_count = 0

    # Semaphore for limiting concurrent connections
    self._semaphore = asyncio.Semaphore(self._config.max_connections)

    # Pool state
    self._closed = False
    self._lock = asyncio.Lock()

    # Background tasks
    self._health_check_task: asyncio.Task[None] | None = None
    self._cleanup_task: asyncio.Task[None] | None = None

acquire async

acquire(
    uri_or_path: str, timeout: float | None = None
) -> PooledConnection[T]

Acquire a connection from the pool.

If a matching idle connection exists, it will be reused. Otherwise, a new connection will be created.

Parameters:

Name Type Description Default
uri_or_path str

Full URI or path relative to base_uri.

required
timeout float | None

Timeout for acquiring. Uses config default if not provided.

None

Returns:

Type Description
PooledConnection[T]

A PooledConnection wrapper.

Raises:

Type Description
PoolClosedError

If the pool has been closed.

PoolExhaustedError

If no connection available within timeout.

ConnectionError

If connection fails.

Source code in src/jetsocket/pool.py
async def acquire(
    self,
    uri_or_path: str,
    timeout: float | None = None,
) -> PooledConnection[T]:
    """Acquire a connection from the pool.

    If a matching idle connection exists, it will be reused.
    Otherwise, a new connection will be created.

    Args:
        uri_or_path: Full URI or path relative to base_uri.
        timeout: Timeout for acquiring. Uses config default if not provided.

    Returns:
        A PooledConnection wrapper.

    Raises:
        PoolClosedError: If the pool has been closed.
        PoolExhaustedError: If no connection available within timeout.
        ConnectionError: If connection fails.
    """
    if self._closed:
        raise PoolClosedError("Pool has been closed")

    timeout = timeout if timeout is not None else self._config.acquire_timeout

    # Resolve full URI
    full_uri = self._resolve_uri(uri_or_path)

    # Try to acquire semaphore with timeout
    try:
        acquired = await asyncio.wait_for(
            self._acquire_semaphore(),
            timeout=timeout,
        )
        if not acquired:
            raise PoolExhaustedError(
                f"Could not acquire connection within {timeout}s",
                max_connections=self._config.max_connections,
                timeout=timeout,
            )
    except asyncio.TimeoutError:
        raise PoolExhaustedError(
            f"Could not acquire connection within {timeout}s",
            max_connections=self._config.max_connections,
            timeout=timeout,
        ) from None

    try:
        # Try to get an existing idle connection
        async with self._lock:
            conn_state = self._get_idle_connection(full_uri)

        if conn_state is not None:
            # Verify connection is still usable
            if conn_state.manager.state in (
                ConnectionState.CONNECTED,
                ConnectionState.RECONNECTING,
                ConnectionState.BACKING_OFF,
            ):
                conn_state.mark_used()
                return PooledConnection[T](self, conn_state)
            else:
                # Connection is dead, remove it
                async with self._lock:
                    self._remove_connection(conn_state)

        # Create new connection
        conn_state = await self._create_connection(full_uri)
        conn_state.mark_used()
        return PooledConnection[T](self, conn_state)

    except Exception:
        # Release semaphore on failure
        self._semaphore.release()
        raise

release async

release(conn: PooledConnection[T]) -> None

Release a connection back to the pool.

Parameters:

Name Type Description Default
conn PooledConnection[T]

The pooled connection to release.

required
Source code in src/jetsocket/pool.py
async def release(self, conn: PooledConnection[T]) -> None:
    """Release a connection back to the pool.

    Args:
        conn: The pooled connection to release.
    """
    state = conn._state
    state.mark_released()

    # Release semaphore
    self._semaphore.release()

    # If pool is closed or connection is dead, close it
    if self._closed or state.manager.state.is_terminal:
        await self._close_connection(state)
        async with self._lock:
            self._remove_connection(state)

close async

close() -> None

Close the pool and all connections.

After closing, no new connections can be acquired.

Source code in src/jetsocket/pool.py
async def close(self) -> None:
    """Close the pool and all connections.

    After closing, no new connections can be acquired.
    """
    self._closed = True

    # Cancel background tasks
    if self._health_check_task is not None:
        self._health_check_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._health_check_task
        self._health_check_task = None

    if self._cleanup_task is not None:
        self._cleanup_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._cleanup_task
        self._cleanup_task = None

    # Close all connections
    async with self._lock:
        close_tasks: list[asyncio.Task[None]] = []
        for conn_list in self._connections.values():
            for conn_state in conn_list:
                task = asyncio.create_task(self._close_connection(conn_state))
                close_tasks.append(task)

        if close_tasks:
            await asyncio.gather(*close_tasks, return_exceptions=True)

        self._connections.clear()

stats

stats() -> PoolStats

Get aggregated pool statistics.

Returns:

Type Description
PoolStats

A PoolStats instance with current statistics.

Source code in src/jetsocket/pool.py
def stats(self) -> PoolStats:
    """Get aggregated pool statistics.

    Returns:
        A PoolStats instance with current statistics.
    """
    active = 0
    idle = 0
    total_sent = 0
    total_received = 0
    total_reconnects = 0

    for conn_list in self._connections.values():
        for conn_state in conn_list:
            if conn_state.in_use:
                active += 1
            else:
                idle += 1

            stats = conn_state.manager.stats()
            total_sent += stats.messages_sent
            total_received += stats.messages_received
            total_reconnects += stats.reconnect_count

    return PoolStats(
        total_connections=self._connection_count,
        active_connections=active,
        idle_connections=idle,
        total_messages_sent=total_sent,
        total_messages_received=total_received,
        total_reconnects=total_reconnects,
    )

Configuration for connection pooling.

Parameters:

Name Type Description Default
max_connections int

Maximum concurrent connections in the pool.

10
max_idle_time float

Close idle connections after this many seconds.

300.0
health_check_interval float

Interval between health checks in seconds.

30.0
acquire_timeout float

Timeout for acquiring a connection in seconds.

10.0
Source code in src/jetsocket/pool.py
@dataclass(frozen=True, slots=True)
class ConnectionPoolConfig:
    """Configuration for connection pooling.

    Args:
        max_connections: Maximum concurrent connections in the pool.
        max_idle_time: Close idle connections after this many seconds.
        health_check_interval: Interval between health checks in seconds.
        acquire_timeout: Timeout for acquiring a connection in seconds.
    """

    max_connections: int = 10
    max_idle_time: float = 300.0
    health_check_interval: float = 30.0
    acquire_timeout: float = 10.0

    def __post_init__(self) -> None:
        """Validate configuration."""
        if self.max_connections <= 0:
            msg = f"max_connections must be positive, got {self.max_connections}"
            raise ValueError(msg)
        if self.max_idle_time <= 0:
            msg = f"max_idle_time must be positive, got {self.max_idle_time}"
            raise ValueError(msg)
        if self.health_check_interval <= 0:
            msg = f"health_check_interval must be positive, got {self.health_check_interval}"
            raise ValueError(msg)
        if self.acquire_timeout <= 0:
            msg = f"acquire_timeout must be positive, got {self.acquire_timeout}"
            raise ValueError(msg)

__post_init__

__post_init__() -> None

Validate configuration.

Source code in src/jetsocket/pool.py
def __post_init__(self) -> None:
    """Validate configuration."""
    if self.max_connections <= 0:
        msg = f"max_connections must be positive, got {self.max_connections}"
        raise ValueError(msg)
    if self.max_idle_time <= 0:
        msg = f"max_idle_time must be positive, got {self.max_idle_time}"
        raise ValueError(msg)
    if self.health_check_interval <= 0:
        msg = f"health_check_interval must be positive, got {self.health_check_interval}"
        raise ValueError(msg)
    if self.acquire_timeout <= 0:
        msg = f"acquire_timeout must be positive, got {self.acquire_timeout}"
        raise ValueError(msg)

Usage

from jetsocket import ConnectionPool, ConnectionPoolConfig

config = ConnectionPoolConfig(
    max_connections=10,
    max_idle_time=300.0,
    health_check_interval=30.0,
    acquire_timeout=10.0,
)

async with ConnectionPool(config, base_uri="wss://stream.example.com") as pool:
    async with pool.acquire("/ws/btcusdt") as conn:
        await conn.send({"subscribe": "trades"})
        async for msg in conn:
            print(msg)

Configuration Options

Option Default Description
max_connections 10 Maximum concurrent connections
max_idle_time 300.0 Close idle connections after N seconds
health_check_interval 30.0 Interval between health checks
acquire_timeout 10.0 Timeout for acquiring a connection

Pool Statistics

stats = pool.stats()
print(f"Total: {stats.total_connections}")
print(f"Active: {stats.active_connections}")
print(f"Idle: {stats.idle_connections}")

Pool Exhaustion

When all connections are in use:

from jetsocket import PoolExhaustedError

try:
    async with pool.acquire("/ws", timeout=5.0) as conn:
        ...
except PoolExhaustedError as e:
    print(f"Pool exhausted: {e}")