Source code for tierproxy.proxy.selector

"""Client-side smart selector. Reads /v1/health/upstreams + picks one
based on cost / latency / success-rate, then emits an upstream-hint."""

from __future__ import annotations

import time
from typing import TYPE_CHECKING, Literal

if TYPE_CHECKING:
    from tierproxy.async_client import AsyncTierProxy
    from tierproxy.client import TierProxy
    from tierproxy.resources.health import UpstreamHealth

Strategy = Literal["cheapest", "fastest", "most_reliable", "balanced"]


[docs] def pick(upstreams: list[UpstreamHealth], strategy: Strategy = "balanced") -> UpstreamHealth: """Pick the best upstream by strategy. Excludes red/open ones. 'balanced' = weighted score (success_rate / cost / latency). """ candidates = [u for u in upstreams if u.state != "red" and u.cb_state != "open"] if not candidates: if not upstreams: raise ValueError("No upstreams available") return max(upstreams, key=lambda u: u.success_rate) if strategy == "cheapest": return min(candidates, key=lambda u: u.cost_per_gb_usd) if strategy == "fastest": return min(candidates, key=lambda u: u.latency_p95_ms) if strategy == "most_reliable": return max(candidates, key=lambda u: u.success_rate) def score(u: UpstreamHealth) -> float: denom = max(0.01, u.cost_per_gb_usd) * (1 + u.latency_p95_ms / 1000) return u.success_rate / denom return max(candidates, key=score)
[docs] def pick_next( upstreams: list[UpstreamHealth], exclude: set[str], strategy: Strategy = "balanced", ) -> UpstreamHealth | None: """Pick the best upstream excluding tried IDs. Returns None if exhausted.""" remaining = [u for u in upstreams if u.upstream_id not in exclude] if not remaining: return None return pick(remaining, strategy)
[docs] class SmartSelector: """Convenience: keeps a cached health snapshot, refreshes every N seconds.""" def __init__( self, client: TierProxy, strategy: Strategy = "balanced", cache_ttl: float = 30.0, ) -> None: self._client = client self._strategy: Strategy = strategy self._ttl = cache_ttl self._cache: list[UpstreamHealth] | None = None self._fetched_at: float = 0.0 def _refresh_if_stale(self) -> list[UpstreamHealth]: if not self._cache or time.time() - self._fetched_at > self._ttl: self._cache = self._client.health.upstreams() self._fetched_at = time.time() return self._cache def pick(self) -> UpstreamHealth: return pick(self._refresh_if_stale(), self._strategy) def pick_next(self, exclude: set[str]) -> UpstreamHealth | None: return pick_next(self._refresh_if_stale(), exclude, self._strategy)
class AsyncSmartSelector: def __init__( self, client: AsyncTierProxy, strategy: Strategy = "balanced", cache_ttl: float = 30.0, ) -> None: self._client = client self._strategy: Strategy = strategy self._ttl = cache_ttl self._cache: list[UpstreamHealth] | None = None self._fetched_at: float = 0.0 async def _refresh_if_stale(self) -> list[UpstreamHealth]: if not self._cache or time.time() - self._fetched_at > self._ttl: self._cache = await self._client.health.upstreams() self._fetched_at = time.time() return self._cache async def pick(self) -> UpstreamHealth: return pick(await self._refresh_if_stale(), self._strategy) async def pick_next(self, exclude: set[str]) -> UpstreamHealth | None: return pick_next(await self._refresh_if_stale(), exclude, self._strategy)